import email.message
import email.parser
import logging
import os
import zipfile
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional

from pip._vendor import pkg_resources
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._vendor.packaging.version import parse as parse_version

from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
from pip._internal.utils.egg_link import egg_link_path_from_location
from pip._internal.utils.misc import display_path, normalize_path
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file

from .base import (
    BaseDistribution,
    BaseEntryPoint,
    BaseEnvironment,
    DistributionVersion,
    InfoPath,
    Wheel,
)

__all__ = ["NAME", "Distribution", "Environment"]

logger = logging.getLogger(__name__)

NAME = "pkg_resources"


class EntryPoint(NamedTuple):
    name: str
    value: str
    group: str


class InMemoryMetadata:
    """IMetadataProvider that reads metadata files from a dictionary.

    This also maps metadata decoding exceptions to our internal exception type.
    """

    def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
        self._metadata = metadata
        self._wheel_name = wheel_name

    def has_metadata(self, name: str) -> bool:
        return name in self._metadata

    def get_metadata(self, name: str) -> str:
        try:
            return self._metadata[name].decode()
        except UnicodeDecodeError as e:
            # Augment the default error with the origin of the file.
            raise UnsupportedWheel(
                f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
            )

    def get_metadata_lines(self, name: str) -> Iterable[str]:
        return pkg_resources.yield_lines(self.get_metadata(name))

    def metadata_isdir(self, name: str) -> bool:
        return False

    def metadata_listdir(self, name: str) -> List[str]:
        return []

    def run_script(self, script_name: str, namespace: str) -> None:
        pass


class Distribution(BaseDistribution):
    def __init__(self, dist: pkg_resources.Distribution) -> None:
        self._dist = dist

    @classmethod
    def from_directory(cls, directory: str) -> BaseDistribution:
        dist_dir = directory.rstrip(os.sep)

        # Build a PathMetadata object, from path to metadata. :wink:
        base_dir, dist_dir_name = os.path.split(dist_dir)
        metadata = pkg_resources.PathMetadata(base_dir, dist_dir)

        # Determine the correct Distribution object type.
        if dist_dir.endswith(".egg-info"):
            dist_cls = pkg_resources.Distribution
            dist_name = os.path.splitext(dist_dir_name)[0]
        else:
            assert dist_dir.endswith(".dist-info")
            dist_cls = pkg_resources.DistInfoDistribution
            dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]

        dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
        return cls(dist)

    @classmethod
    def from_metadata_file_contents(
        cls,
        metadata_contents: bytes,
        filename: str,
        project_name: str,
    ) -> BaseDistribution:
        metadata_dict = {
            "METADATA": metadata_contents,
        }
        dist = pkg_resources.DistInfoDistribution(
            location=filename,
            metadata=InMemoryMetadata(metadata_dict, filename),
            project_name=project_name,
        )
        return cls(dist)

    @classmethod
    def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
        try:
            with wheel.as_zipfile() as zf:
                info_dir, _ = parse_wheel(zf, name)
                metadata_dict = {
                    path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
                    for path in zf.namelist()
                    if path.startswith(f"{info_dir}/")
                }
        except zipfile.BadZipFile as e:
            raise InvalidWheel(wheel.location, name) from e
        except UnsupportedWheel as e:
            raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
        dist = pkg_resources.DistInfoDistribution(
            location=wheel.location,
            metadata=InMemoryMetadata(metadata_dict, wheel.location),
            project_name=name,
        )
        return cls(dist)

    @property
    def location(self) -> Optional[str]:
        return self._dist.location

    @property
    def installed_location(self) -> Optional[str]:
        egg_link = egg_link_path_from_location(self.raw_name)
        if egg_link:
            location = egg_link
        elif self.location:
            location = self.location
        else:
            return None
        return normalize_path(location)

    @property
    def info_location(self) -> Optional[str]:
        return self._dist.egg_info

    @property
    def installed_by_distutils(self) -> bool:
        # A distutils-installed distribution is provided by FileMetadata. This
        # provider has a "path" attribute not present anywhere else. Not the
        # best introspection logic, but pip has been doing this for a long time.
        try:
            return bool(self._dist._provider.path)
        except AttributeError:
            return False

    @property
    def canonical_name(self) -> NormalizedName:
        return canonicalize_name(self._dist.project_name)

    @property
    def version(self) -> DistributionVersion:
        return parse_version(self._dist.version)

    def is_file(self, path: InfoPath) -> bool:
        return self._dist.has_metadata(str(path))

    def iter_distutils_script_names(self) -> Iterator[str]:
        yield from self._dist.metadata_listdir("scripts")

    def read_text(self, path: InfoPath) -> str:
        name = str(path)
        if not self._dist.has_metadata(name):
            raise FileNotFoundError(name)
        content = self._dist.get_metadata(name)
        if content is None:
            raise NoneMetadataError(self, name)
        return content

    def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
        for group, entries in self._dist.get_entry_map().items():
            for name, entry_point in entries.items():
                name, _, value = str(entry_point).partition("=")
                yield EntryPoint(name=name.strip(), value=value.strip(), group=group)

    def _metadata_impl(self) -> email.message.Message:
        """
        :raises NoneMetadataError: if the distribution reports `has_metadata()`
            True but `get_metadata()` returns None.
        """
        if isinstance(self._dist, pkg_resources.DistInfoDistribution):
            metadata_name = "METADATA"
        else:
            metadata_name = "PKG-INFO"
        try:
            metadata = self.read_text(metadata_name)
        except FileNotFoundError:
            if self.location:
                displaying_path = display_path(self.location)
            else:
                displaying_path = repr(self.location)
            logger.warning("No metadata found in %s", displaying_path)
            metadata = ""
        feed_parser = email.parser.FeedParser()
        feed_parser.feed(metadata)
        return feed_parser.close()

    def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
        if extras:  # pkg_resources raises on invalid extras, so we sanitize.
            extras = frozenset(pkg_resources.safe_extra(e) for e in extras)
            extras = extras.intersection(self._dist.extras)
        return self._dist.requires(extras)

    def iter_provided_extras(self) -> Iterable[str]:
        return self._dist.extras

    def is_extra_provided(self, extra: str) -> bool:
        return pkg_resources.safe_extra(extra) in self._dist.extras


class Environment(BaseEnvironment):
    def __init__(self, ws: pkg_resources.WorkingSet) -> None:
        self._ws = ws

    @classmethod
    def default(cls) -> BaseEnvironment:
        return cls(pkg_resources.working_set)

    @classmethod
    def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
        return cls(pkg_resources.WorkingSet(paths))

    def _iter_distributions(self) -> Iterator[BaseDistribution]:
        for dist in self._ws:
            yield Distribution(dist)

    def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
        """Find a distribution matching the ``name`` in the environment.

        This searches from *all* distributions available in the environment, to
        match the behavior of ``pkg_resources.get_distribution()``.
        """
        canonical_name = canonicalize_name(name)
        for dist in self.iter_all_distributions():
            if dist.canonical_name == canonical_name:
                return dist
        return None

    def get_distribution(self, name: str) -> Optional[BaseDistribution]:
        # Search the distribution by looking through the working set.
        dist = self._search_distribution(name)
        if dist:
            return dist

        # If distribution could not be found, call working_set.require to
        # update the working set, and try to find the distribution again.
        # This might happen for e.g. when you install a package twice, once
        # using setup.py develop and again using setup.py install. Now when
        # running pip uninstall twice, the package gets removed from the
        # working set in the first uninstall, so we have to populate the
        # working set again so that pip knows about it and the packages gets
        # picked up and is successfully uninstalled the second time too.
        try:
            # We didn't pass in any version specifiers, so this can never
            # raise pkg_resources.VersionConflict.
            self._ws.require(name)
        except pkg_resources.DistributionNotFound:
            return None
        return self._search_distribution(name)