import logging
import optparse
import os
import re
import shlex
import urllib.parse
from optparse import Values
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
)
from pip._internal.cli import cmdoptions
from pip._internal.exceptions import InstallationError, RequirementsFileParseError
from pip._internal.models.search_scope import SearchScope
from pip._internal.network.session import PipSession
from pip._internal.network.utils import raise_for_status
from pip._internal.utils.encoding import auto_decode
from pip._internal.utils.urls import get_url_scheme
if TYPE_CHECKING:
from typing import NoReturn
from pip._internal.index.package_finder import PackageFinder
__all__ = ["parse_requirements"]
ReqFileLines = Iterable[Tuple[int, str]]
LineParser = Callable[[str], Tuple[str, Values]]
SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
COMMENT_RE = re.compile(r"(^|\s+)#.*$")
ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [
cmdoptions.index_url,
cmdoptions.extra_index_url,
cmdoptions.no_index,
cmdoptions.constraints,
cmdoptions.requirements,
cmdoptions.editable,
cmdoptions.find_links,
cmdoptions.no_binary,
cmdoptions.only_binary,
cmdoptions.prefer_binary,
cmdoptions.require_hashes,
cmdoptions.pre,
cmdoptions.trusted_host,
cmdoptions.use_new_feature,
]
SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [
cmdoptions.global_options,
cmdoptions.hash,
cmdoptions.config_settings,
]
SUPPORTED_OPTIONS_EDITABLE_REQ: List[Callable[..., optparse.Option]] = [
cmdoptions.config_settings,
]
SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [
str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ
]
logger = logging.getLogger(__name__)
class ParsedRequirement:
def __init__(
self,
requirement: str,
is_editable: bool,
comes_from: str,
constraint: bool,
options: Optional[Dict[str, Any]] = None,
line_source: Optional[str] = None,
) -> None:
self.requirement = requirement
self.is_editable = is_editable
self.comes_from = comes_from
self.options = options
self.constraint = constraint
self.line_source = line_source
class ParsedLine:
def __init__(
self,
filename: str,
lineno: int,
args: str,
opts: Values,
constraint: bool,
) -> None:
self.filename = filename
self.lineno = lineno
self.opts = opts
self.constraint = constraint
if args:
self.is_requirement = True
self.is_editable = False
self.requirement = args
elif opts.editables:
self.is_requirement = True
self.is_editable = True
self.requirement = opts.editables[0]
else:
self.is_requirement = False
def parse_requirements(
filename: str,
session: PipSession,
finder: Optional["PackageFinder"] = None,
options: Optional[optparse.Values] = None,
constraint: bool = False,
) -> Generator[ParsedRequirement, None, None]:
line_parser = get_line_parser(finder)
parser = RequirementsFileParser(session, line_parser)
for parsed_line in parser.parse(filename, constraint):
parsed_req = handle_line(
parsed_line, options=options, finder=finder, session=session
)
if parsed_req is not None:
yield parsed_req
def preprocess(content: str) -> ReqFileLines:
lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
lines_enum = join_lines(lines_enum)
lines_enum = ignore_comments(lines_enum)
lines_enum = expand_env_variables(lines_enum)
return lines_enum
def handle_requirement_line(
line: ParsedLine,
options: Optional[optparse.Values] = None,
) -> ParsedRequirement:
line_comes_from = "{} {} (line {})".format(
"-c" if line.constraint else "-r",
line.filename,
line.lineno,
)
assert line.is_requirement
if line.is_editable:
supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST
else:
supported_dest = SUPPORTED_OPTIONS_REQ_DEST
req_options = {}
for dest in supported_dest:
if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
req_options[dest] = line.opts.__dict__[dest]
line_source = f"line {line.lineno} of {line.filename}"
return ParsedRequirement(
requirement=line.requirement,
is_editable=line.is_editable,
comes_from=line_comes_from,
constraint=line.constraint,
options=req_options,
line_source=line_source,
)
def handle_option_line(
opts: Values,
filename: str,
lineno: int,
finder: Optional["PackageFinder"] = None,
options: Optional[optparse.Values] = None,
session: Optional[PipSession] = None,
) -> None:
if opts.hashes:
logger.warning(
"%s line %s has --hash but no requirement, and will be ignored.",
filename,
lineno,
)
if options:
if opts.require_hashes:
options.require_hashes = opts.require_hashes
if opts.features_enabled:
options.features_enabled.extend(
f for f in opts.features_enabled if f not in options.features_enabled
)
if finder:
find_links = finder.find_links
index_urls = finder.index_urls
no_index = finder.search_scope.no_index
if opts.no_index is True:
no_index = True
index_urls = []
if opts.index_url and not no_index:
index_urls = [opts.index_url]
if opts.extra_index_urls and not no_index:
index_urls.extend(opts.extra_index_urls)
if opts.find_links:
value = opts.find_links[0]
req_dir = os.path.dirname(os.path.abspath(filename))
relative_to_reqs_file = os.path.join(req_dir, value)
if os.path.exists(relative_to_reqs_file):
value = relative_to_reqs_file
find_links.append(value)
if session:
session.update_index_urls(index_urls)
search_scope = SearchScope(
find_links=find_links,
index_urls=index_urls,
no_index=no_index,
)
finder.search_scope = search_scope
if opts.pre:
finder.set_allow_all_prereleases()
if opts.prefer_binary:
finder.set_prefer_binary()
if session:
for host in opts.trusted_hosts or []:
source = f"line {lineno} of {filename}"
session.add_trusted_host(host, source=source)
def handle_line(
line: ParsedLine,
options: Optional[optparse.Values] = None,
finder: Optional["PackageFinder"] = None,
session: Optional[PipSession] = None,
) -> Optional[ParsedRequirement]:
if line.is_requirement:
parsed_req = handle_requirement_line(line, options)
return parsed_req
else:
handle_option_line(
line.opts,
line.filename,
line.lineno,
finder,
options,
session,
)
return None
class RequirementsFileParser:
def __init__(
self,
session: PipSession,
line_parser: LineParser,
) -> None:
self._session = session
self._line_parser = line_parser
def parse(
self, filename: str, constraint: bool
) -> Generator[ParsedLine, None, None]:
yield from self._parse_and_recurse(filename, constraint)
def _parse_and_recurse(
self, filename: str, constraint: bool
) -> Generator[ParsedLine, None, None]:
for line in self._parse_file(filename, constraint):
if not line.is_requirement and (
line.opts.requirements or line.opts.constraints
):
if line.opts.requirements:
req_path = line.opts.requirements[0]
nested_constraint = False
else:
req_path = line.opts.constraints[0]
nested_constraint = True
if SCHEME_RE.search(filename):
req_path = urllib.parse.urljoin(filename, req_path)
elif not SCHEME_RE.search(req_path):
req_path = os.path.join(
os.path.dirname(filename),
req_path,
)
yield from self._parse_and_recurse(req_path, nested_constraint)
else:
yield line
def _parse_file(
self, filename: str, constraint: bool
) -> Generator[ParsedLine, None, None]:
_, content = get_file_content(filename, self._session)
lines_enum = preprocess(content)
for line_number, line in lines_enum:
try:
args_str, opts = self._line_parser(line)
except OptionParsingError as e:
msg = f"Invalid requirement: {line}\n{e.msg}"
raise RequirementsFileParseError(msg)
yield ParsedLine(
filename,
line_number,
args_str,
opts,
constraint,
)
def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser:
def parse_line(line: str) -> Tuple[str, Values]:
parser = build_parser()
defaults = parser.get_default_values()
defaults.index_url = None
if finder:
defaults.format_control = finder.format_control
args_str, options_str = break_args_options(line)
try:
options = shlex.split(options_str)
except ValueError as e:
raise OptionParsingError(f"Could not split options: {options_str}") from e
opts, _ = parser.parse_args(options, defaults)
return args_str, opts
return parse_line
def break_args_options(line: str) -> Tuple[str, str]:
tokens = line.split(" ")
args = []
options = tokens[:]
for token in tokens:
if token.startswith("-") or token.startswith("--"):
break
else:
args.append(token)
options.pop(0)
return " ".join(args), " ".join(options)
class OptionParsingError(Exception):
def __init__(self, msg: str) -> None:
self.msg = msg
def build_parser() -> optparse.OptionParser:
parser = optparse.OptionParser(add_help_option=False)
option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ
for option_factory in option_factories:
option = option_factory()
parser.add_option(option)
def parser_exit(self: Any, msg: str) -> "NoReturn":
raise OptionParsingError(msg)
parser.exit = parser_exit
return parser
def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
primary_line_number = None
new_line: List[str] = []
for line_number, line in lines_enum:
if not line.endswith("\\") or COMMENT_RE.match(line):
if COMMENT_RE.match(line):
line = " " + line
if new_line:
new_line.append(line)
assert primary_line_number is not None
yield primary_line_number, "".join(new_line)
new_line = []
else:
yield line_number, line
else:
if not new_line:
primary_line_number = line_number
new_line.append(line.strip("\\"))
if new_line:
assert primary_line_number is not None
yield primary_line_number, "".join(new_line)
def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
for line_number, line in lines_enum:
line = COMMENT_RE.sub("", line)
line = line.strip()
if line:
yield line_number, line
def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
for line_number, line in lines_enum:
for env_var, var_name in ENV_VAR_RE.findall(line):
value = os.getenv(var_name)
if not value:
continue
line = line.replace(env_var, value)
yield line_number, line
def get_file_content(url: str, session: PipSession) -> Tuple[str, str]:
scheme = get_url_scheme(url)
if scheme in ["http", "https", "file"]:
resp = session.get(url)
raise_for_status(resp)
return resp.url, resp.text
try:
with open(url, "rb") as f:
content = auto_decode(f.read())
except OSError as exc:
raise InstallationError(f"Could not open requirements file: {exc}")
return url, content