import functools
import sys
import threading
import time
import typing as t
import warnings
from abc import ABC, abstractmethod
from concurrent import futures
from inspect import iscoroutinefunction
from .retry import retry_base from .retry import retry_all from .retry import retry_always from .retry import retry_any from .retry import retry_if_exception from .retry import retry_if_exception_type from .retry import retry_if_exception_cause_type from .retry import retry_if_not_exception_type from .retry import retry_if_not_result from .retry import retry_if_result from .retry import retry_never from .retry import retry_unless_exception_type from .retry import retry_if_exception_message from .retry import retry_if_not_exception_message
from .nap import sleep from .nap import sleep_using_event
from .stop import stop_after_attempt from .stop import stop_after_delay from .stop import stop_all from .stop import stop_any from .stop import stop_never from .stop import stop_when_event_set
from .wait import wait_chain from .wait import wait_combine from .wait import wait_exponential from .wait import wait_fixed from .wait import wait_incrementing from .wait import wait_none from .wait import wait_random from .wait import wait_random_exponential from .wait import wait_random_exponential as wait_full_jitter from .wait import wait_exponential_jitter
from .before import before_log from .before import before_nothing
from .after import after_log from .after import after_nothing
from .before_sleep import before_sleep_log from .before_sleep import before_sleep_nothing
tornado = None
if t.TYPE_CHECKING:
import types
from .retry import RetryBaseT
from .stop import StopBaseT
from .wait import WaitBaseT
WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Any])
class TryAgain(Exception):
NO_RESULT = object()
class DoAttempt:
pass
class DoSleep(float):
pass
class BaseAction:
REPR_FIELDS: t.Sequence[str] = ()
NAME: t.Optional[str] = None
def __repr__(self) -> str:
state_str = ", ".join(f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS)
return f"{self.__class__.__name__}({state_str})"
def __str__(self) -> str:
return repr(self)
class RetryAction(BaseAction):
REPR_FIELDS = ("sleep",)
NAME = "retry"
def __init__(self, sleep: t.SupportsFloat) -> None:
self.sleep = float(sleep)
_unset = object()
def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any:
return second if first is _unset else first
class RetryError(Exception):
def __init__(self, last_attempt: "Future") -> None:
self.last_attempt = last_attempt
super().__init__(last_attempt)
def reraise(self) -> "t.NoReturn":
if self.last_attempt.failed:
raise self.last_attempt.result()
raise self
def __str__(self) -> str:
return f"{self.__class__.__name__}[{self.last_attempt}]"
class AttemptManager:
def __init__(self, retry_state: "RetryCallState"):
self.retry_state = retry_state
def __enter__(self) -> None:
pass
def __exit__(
self,
exc_type: t.Optional[t.Type[BaseException]],
exc_value: t.Optional[BaseException],
traceback: t.Optional["types.TracebackType"],
) -> t.Optional[bool]:
if exc_type is not None and exc_value is not None:
self.retry_state.set_exception((exc_type, exc_value, traceback))
return True else:
self.retry_state.set_result(None)
return None
class BaseRetrying(ABC):
def __init__(
self,
sleep: t.Callable[[t.Union[int, float]], None] = sleep,
stop: "StopBaseT" = stop_never,
wait: "WaitBaseT" = wait_none(),
retry: "RetryBaseT" = retry_if_exception_type(),
before: t.Callable[["RetryCallState"], None] = before_nothing,
after: t.Callable[["RetryCallState"], None] = after_nothing,
before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
reraise: bool = False,
retry_error_cls: t.Type[RetryError] = RetryError,
retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
):
self.sleep = sleep
self.stop = stop
self.wait = wait
self.retry = retry
self.before = before
self.after = after
self.before_sleep = before_sleep
self.reraise = reraise
self._local = threading.local()
self.retry_error_cls = retry_error_cls
self.retry_error_callback = retry_error_callback
def copy(
self,
sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset,
stop: t.Union["StopBaseT", object] = _unset,
wait: t.Union["WaitBaseT", object] = _unset,
retry: t.Union[retry_base, object] = _unset,
before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
before_sleep: t.Union[t.Optional[t.Callable[["RetryCallState"], None]], object] = _unset,
reraise: t.Union[bool, object] = _unset,
retry_error_cls: t.Union[t.Type[RetryError], object] = _unset,
retry_error_callback: t.Union[t.Optional[t.Callable[["RetryCallState"], t.Any]], object] = _unset,
) -> "BaseRetrying":
return self.__class__(
sleep=_first_set(sleep, self.sleep),
stop=_first_set(stop, self.stop),
wait=_first_set(wait, self.wait),
retry=_first_set(retry, self.retry),
before=_first_set(before, self.before),
after=_first_set(after, self.after),
before_sleep=_first_set(before_sleep, self.before_sleep),
reraise=_first_set(reraise, self.reraise),
retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls),
retry_error_callback=_first_set(retry_error_callback, self.retry_error_callback),
)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} object at 0x{id(self):x} ("
f"stop={self.stop}, "
f"wait={self.wait}, "
f"sleep={self.sleep}, "
f"retry={self.retry}, "
f"before={self.before}, "
f"after={self.after})>"
)
@property
def statistics(self) -> t.Dict[str, t.Any]:
try:
return self._local.statistics except AttributeError:
self._local.statistics = t.cast(t.Dict[str, t.Any], {})
return self._local.statistics
def wraps(self, f: WrappedFn) -> WrappedFn:
@functools.wraps(f)
def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
return self(f, *args, **kw)
def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn:
return self.copy(*args, **kwargs).wraps(f)
wrapped_f.retry = self wrapped_f.retry_with = retry_with
return wrapped_f
def begin(self) -> None:
self.statistics.clear()
self.statistics["start_time"] = time.monotonic()
self.statistics["attempt_number"] = 1
self.statistics["idle_for"] = 0
def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: fut = retry_state.outcome
if fut is None:
if self.before is not None:
self.before(retry_state)
return DoAttempt()
is_explicit_retry = fut.failed and isinstance(fut.exception(), TryAgain)
if not (is_explicit_retry or self.retry(retry_state)):
return fut.result()
if self.after is not None:
self.after(retry_state)
self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
if self.stop(retry_state):
if self.retry_error_callback:
return self.retry_error_callback(retry_state)
retry_exc = self.retry_error_cls(fut)
if self.reraise:
raise retry_exc.reraise()
raise retry_exc from fut.exception()
if self.wait:
sleep = self.wait(retry_state)
else:
sleep = 0.0
retry_state.next_action = RetryAction(sleep)
retry_state.idle_for += sleep
self.statistics["idle_for"] += sleep
self.statistics["attempt_number"] += 1
if self.before_sleep is not None:
self.before_sleep(retry_state)
return DoSleep(sleep)
def __iter__(self) -> t.Generator[AttemptManager, None, None]:
self.begin()
retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
while True:
do = self.iter(retry_state=retry_state)
if isinstance(do, DoAttempt):
yield AttemptManager(retry_state=retry_state)
elif isinstance(do, DoSleep):
retry_state.prepare_for_next_attempt()
self.sleep(do)
else:
break
@abstractmethod
def __call__(
self,
fn: t.Callable[..., WrappedFnReturnT],
*args: t.Any,
**kwargs: t.Any,
) -> WrappedFnReturnT:
pass
class Retrying(BaseRetrying):
def __call__(
self,
fn: t.Callable[..., WrappedFnReturnT],
*args: t.Any,
**kwargs: t.Any,
) -> WrappedFnReturnT:
self.begin()
retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
while True:
do = self.iter(retry_state=retry_state)
if isinstance(do, DoAttempt):
try:
result = fn(*args, **kwargs)
except BaseException: retry_state.set_exception(sys.exc_info()) else:
retry_state.set_result(result)
elif isinstance(do, DoSleep):
retry_state.prepare_for_next_attempt()
self.sleep(do)
else:
return do
if sys.version_info[1] >= 9:
FutureGenericT = futures.Future[t.Any]
else:
FutureGenericT = futures.Future
class Future(FutureGenericT):
def __init__(self, attempt_number: int) -> None:
super().__init__()
self.attempt_number = attempt_number
@property
def failed(self) -> bool:
return self.exception() is not None
@classmethod
def construct(cls, attempt_number: int, value: t.Any, has_exception: bool) -> "Future":
fut = cls(attempt_number)
if has_exception:
fut.set_exception(value)
else:
fut.set_result(value)
return fut
class RetryCallState:
def __init__(
self,
retry_object: BaseRetrying,
fn: t.Optional[WrappedFn],
args: t.Any,
kwargs: t.Any,
) -> None:
self.start_time = time.monotonic()
self.retry_object = retry_object
self.fn = fn
self.args = args
self.kwargs = kwargs
self.attempt_number: int = 1
self.outcome: t.Optional[Future] = None
self.outcome_timestamp: t.Optional[float] = None
self.idle_for: float = 0.0
self.next_action: t.Optional[RetryAction] = None
@property
def seconds_since_start(self) -> t.Optional[float]:
if self.outcome_timestamp is None:
return None
return self.outcome_timestamp - self.start_time
def prepare_for_next_attempt(self) -> None:
self.outcome = None
self.outcome_timestamp = None
self.attempt_number += 1
self.next_action = None
def set_result(self, val: t.Any) -> None:
ts = time.monotonic()
fut = Future(self.attempt_number)
fut.set_result(val)
self.outcome, self.outcome_timestamp = fut, ts
def set_exception(
self, exc_info: t.Tuple[t.Type[BaseException], BaseException, "types.TracebackType| None"]
) -> None:
ts = time.monotonic()
fut = Future(self.attempt_number)
fut.set_exception(exc_info[1])
self.outcome, self.outcome_timestamp = fut, ts
def __repr__(self) -> str:
if self.outcome is None:
result = "none yet"
elif self.outcome.failed:
exception = self.outcome.exception()
result = f"failed ({exception.__class__.__name__} {exception})"
else:
result = f"returned {self.outcome.result()}"
slept = float(round(self.idle_for, 2))
clsname = self.__class__.__name__
return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>"
@t.overload
def retry(func: WrappedFn) -> WrappedFn:
...
@t.overload
def retry(
sleep: t.Callable[[t.Union[int, float]], None] = sleep,
stop: "StopBaseT" = stop_never,
wait: "WaitBaseT" = wait_none(),
retry: "RetryBaseT" = retry_if_exception_type(),
before: t.Callable[["RetryCallState"], None] = before_nothing,
after: t.Callable[["RetryCallState"], None] = after_nothing,
before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
reraise: bool = False,
retry_error_cls: t.Type["RetryError"] = RetryError,
retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
) -> t.Callable[[WrappedFn], WrappedFn]:
...
def retry(*dargs: t.Any, **dkw: t.Any) -> t.Any:
if len(dargs) == 1 and callable(dargs[0]):
return retry()(dargs[0])
else:
def wrap(f: WrappedFn) -> WrappedFn:
if isinstance(f, retry_base):
warnings.warn(
f"Got retry_base instance ({f.__class__.__name__}) as callable argument, "
f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)"
)
r: "BaseRetrying"
if iscoroutinefunction(f):
r = AsyncRetrying(*dargs, **dkw)
elif tornado and hasattr(tornado.gen, "is_coroutine_function") and tornado.gen.is_coroutine_function(f):
r = TornadoRetrying(*dargs, **dkw)
else:
r = Retrying(*dargs, **dkw)
return r.wraps(f)
return wrap
from pip._vendor.tenacity._asyncio import AsyncRetrying
if tornado:
from pip._vendor.tenacity.tornadoweb import TornadoRetrying
__all__ = [
"retry_base",
"retry_all",
"retry_always",
"retry_any",
"retry_if_exception",
"retry_if_exception_type",
"retry_if_exception_cause_type",
"retry_if_not_exception_type",
"retry_if_not_result",
"retry_if_result",
"retry_never",
"retry_unless_exception_type",
"retry_if_exception_message",
"retry_if_not_exception_message",
"sleep",
"sleep_using_event",
"stop_after_attempt",
"stop_after_delay",
"stop_all",
"stop_any",
"stop_never",
"stop_when_event_set",
"wait_chain",
"wait_combine",
"wait_exponential",
"wait_fixed",
"wait_incrementing",
"wait_none",
"wait_random",
"wait_random_exponential",
"wait_full_jitter",
"wait_exponential_jitter",
"before_log",
"before_nothing",
"after_log",
"after_nothing",
"before_sleep_log",
"before_sleep_nothing",
"retry",
"WrappedFn",
"TryAgain",
"NO_RESULT",
"DoAttempt",
"DoSleep",
"BaseAction",
"RetryAction",
"RetryError",
"AttemptManager",
"BaseRetrying",
"Retrying",
"Future",
"RetryCallState",
"AsyncRetrying",
]