import io
import sys
import typing
import warnings
from abc import ABC, abstractmethod
from collections import deque
from dataclasses import dataclass, field
from datetime import timedelta
from io import RawIOBase, UnsupportedOperation
from math import ceil
from mmap import mmap
from operator import length_hint
from os import PathLike, stat
from threading import Event, RLock, Thread
from types import TracebackType
from typing import (
Any,
BinaryIO,
Callable,
ContextManager,
Deque,
Dict,
Generic,
Iterable,
List,
NamedTuple,
NewType,
Optional,
Sequence,
TextIO,
Tuple,
Type,
TypeVar,
Union,
)
if sys.version_info >= (3, 8):
from typing import Literal
else:
from pip._vendor.typing_extensions import Literal
from . import filesize, get_console
from .console import Console, Group, JustifyMethod, RenderableType
from .highlighter import Highlighter
from .jupyter import JupyterMixin
from .live import Live
from .progress_bar import ProgressBar
from .spinner import Spinner
from .style import StyleType
from .table import Column, Table
from .text import Text, TextType
TaskID = NewType("TaskID", int)
ProgressType = TypeVar("ProgressType")
GetTimeCallable = Callable[[], float]
_I = typing.TypeVar("_I", TextIO, BinaryIO)
class _TrackThread(Thread):
def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float):
self.progress = progress
self.task_id = task_id
self.update_period = update_period
self.done = Event()
self.completed = 0
super().__init__()
def run(self) -> None:
task_id = self.task_id
advance = self.progress.advance
update_period = self.update_period
last_completed = 0
wait = self.done.wait
while not wait(update_period):
completed = self.completed
if last_completed != completed:
advance(task_id, completed - last_completed)
last_completed = completed
self.progress.update(self.task_id, completed=self.completed, refresh=True)
def __enter__(self) -> "_TrackThread":
self.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.done.set()
self.join()
def track(
sequence: Union[Sequence[ProgressType], Iterable[ProgressType]],
description: str = "Working...",
total: Optional[float] = None,
auto_refresh: bool = True,
console: Optional[Console] = None,
transient: bool = False,
get_time: Optional[Callable[[], float]] = None,
refresh_per_second: float = 10,
style: StyleType = "bar.back",
complete_style: StyleType = "bar.complete",
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
update_period: float = 0.1,
disable: bool = False,
show_speed: bool = True,
) -> Iterable[ProgressType]:
columns: List["ProgressColumn"] = (
[TextColumn("[progress.description]{task.description}")] if description else []
)
columns.extend(
(
BarColumn(
style=style,
complete_style=complete_style,
finished_style=finished_style,
pulse_style=pulse_style,
),
TaskProgressColumn(show_speed=show_speed),
TimeRemainingColumn(elapsed_when_finished=True),
)
)
progress = Progress(
*columns,
auto_refresh=auto_refresh,
console=console,
transient=transient,
get_time=get_time,
refresh_per_second=refresh_per_second or 10,
disable=disable,
)
with progress:
yield from progress.track(
sequence, total=total, description=description, update_period=update_period
)
class _Reader(RawIOBase, BinaryIO):
def __init__(
self,
handle: BinaryIO,
progress: "Progress",
task: TaskID,
close_handle: bool = True,
) -> None:
self.handle = handle
self.progress = progress
self.task = task
self.close_handle = close_handle
self._closed = False
def __enter__(self) -> "_Reader":
self.handle.__enter__()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
def __iter__(self) -> BinaryIO:
return self
def __next__(self) -> bytes:
line = next(self.handle)
self.progress.advance(self.task, advance=len(line))
return line
@property
def closed(self) -> bool:
return self._closed
def fileno(self) -> int:
return self.handle.fileno()
def isatty(self) -> bool:
return self.handle.isatty()
@property
def mode(self) -> str:
return self.handle.mode
@property
def name(self) -> str:
return self.handle.name
def readable(self) -> bool:
return self.handle.readable()
def seekable(self) -> bool:
return self.handle.seekable()
def writable(self) -> bool:
return False
def read(self, size: int = -1) -> bytes:
block = self.handle.read(size)
self.progress.advance(self.task, advance=len(block))
return block
def readinto(self, b: Union[bytearray, memoryview, mmap]): n = self.handle.readinto(b) self.progress.advance(self.task, advance=n)
return n
def readline(self, size: int = -1) -> bytes: line = self.handle.readline(size)
self.progress.advance(self.task, advance=len(line))
return line
def readlines(self, hint: int = -1) -> List[bytes]:
lines = self.handle.readlines(hint)
self.progress.advance(self.task, advance=sum(map(len, lines)))
return lines
def close(self) -> None:
if self.close_handle:
self.handle.close()
self._closed = True
def seek(self, offset: int, whence: int = 0) -> int:
pos = self.handle.seek(offset, whence)
self.progress.update(self.task, completed=pos)
return pos
def tell(self) -> int:
return self.handle.tell()
def write(self, s: Any) -> int:
raise UnsupportedOperation("write")
class _ReadContext(ContextManager[_I], Generic[_I]):
def __init__(self, progress: "Progress", reader: _I) -> None:
self.progress = progress
self.reader: _I = reader
def __enter__(self) -> _I:
self.progress.start()
return self.reader.__enter__()
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.progress.stop()
self.reader.__exit__(exc_type, exc_val, exc_tb)
def wrap_file(
file: BinaryIO,
total: int,
*,
description: str = "Reading...",
auto_refresh: bool = True,
console: Optional[Console] = None,
transient: bool = False,
get_time: Optional[Callable[[], float]] = None,
refresh_per_second: float = 10,
style: StyleType = "bar.back",
complete_style: StyleType = "bar.complete",
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
disable: bool = False,
) -> ContextManager[BinaryIO]:
columns: List["ProgressColumn"] = (
[TextColumn("[progress.description]{task.description}")] if description else []
)
columns.extend(
(
BarColumn(
style=style,
complete_style=complete_style,
finished_style=finished_style,
pulse_style=pulse_style,
),
DownloadColumn(),
TimeRemainingColumn(),
)
)
progress = Progress(
*columns,
auto_refresh=auto_refresh,
console=console,
transient=transient,
get_time=get_time,
refresh_per_second=refresh_per_second or 10,
disable=disable,
)
reader = progress.wrap_file(file, total=total, description=description)
return _ReadContext(progress, reader)
@typing.overload
def open(
file: Union[str, "PathLike[str]", bytes],
mode: Union[Literal["rt"], Literal["r"]],
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
*,
total: Optional[int] = None,
description: str = "Reading...",
auto_refresh: bool = True,
console: Optional[Console] = None,
transient: bool = False,
get_time: Optional[Callable[[], float]] = None,
refresh_per_second: float = 10,
style: StyleType = "bar.back",
complete_style: StyleType = "bar.complete",
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
disable: bool = False,
) -> ContextManager[TextIO]:
pass
@typing.overload
def open(
file: Union[str, "PathLike[str]", bytes],
mode: Literal["rb"],
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
*,
total: Optional[int] = None,
description: str = "Reading...",
auto_refresh: bool = True,
console: Optional[Console] = None,
transient: bool = False,
get_time: Optional[Callable[[], float]] = None,
refresh_per_second: float = 10,
style: StyleType = "bar.back",
complete_style: StyleType = "bar.complete",
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
disable: bool = False,
) -> ContextManager[BinaryIO]:
pass
def open(
file: Union[str, "PathLike[str]", bytes],
mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
*,
total: Optional[int] = None,
description: str = "Reading...",
auto_refresh: bool = True,
console: Optional[Console] = None,
transient: bool = False,
get_time: Optional[Callable[[], float]] = None,
refresh_per_second: float = 10,
style: StyleType = "bar.back",
complete_style: StyleType = "bar.complete",
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
disable: bool = False,
) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]:
columns: List["ProgressColumn"] = (
[TextColumn("[progress.description]{task.description}")] if description else []
)
columns.extend(
(
BarColumn(
style=style,
complete_style=complete_style,
finished_style=finished_style,
pulse_style=pulse_style,
),
DownloadColumn(),
TimeRemainingColumn(),
)
)
progress = Progress(
*columns,
auto_refresh=auto_refresh,
console=console,
transient=transient,
get_time=get_time,
refresh_per_second=refresh_per_second or 10,
disable=disable,
)
reader = progress.open(
file,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
total=total,
description=description,
)
return _ReadContext(progress, reader)
class ProgressColumn(ABC):
max_refresh: Optional[float] = None
def __init__(self, table_column: Optional[Column] = None) -> None:
self._table_column = table_column
self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {}
self._update_time: Optional[float] = None
def get_table_column(self) -> Column:
return self._table_column or Column()
def __call__(self, task: "Task") -> RenderableType:
current_time = task.get_time()
if self.max_refresh is not None and not task.completed:
try:
timestamp, renderable = self._renderable_cache[task.id]
except KeyError:
pass
else:
if timestamp + self.max_refresh > current_time:
return renderable
renderable = self.render(task)
self._renderable_cache[task.id] = (current_time, renderable)
return renderable
@abstractmethod
def render(self, task: "Task") -> RenderableType:
class RenderableColumn(ProgressColumn):
def __init__(
self, renderable: RenderableType = "", *, table_column: Optional[Column] = None
):
self.renderable = renderable
super().__init__(table_column=table_column)
def render(self, task: "Task") -> RenderableType:
return self.renderable
class SpinnerColumn(ProgressColumn):
def __init__(
self,
spinner_name: str = "dots",
style: Optional[StyleType] = "progress.spinner",
speed: float = 1.0,
finished_text: TextType = " ",
table_column: Optional[Column] = None,
):
self.spinner = Spinner(spinner_name, style=style, speed=speed)
self.finished_text = (
Text.from_markup(finished_text)
if isinstance(finished_text, str)
else finished_text
)
super().__init__(table_column=table_column)
def set_spinner(
self,
spinner_name: str,
spinner_style: Optional[StyleType] = "progress.spinner",
speed: float = 1.0,
) -> None:
self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed)
def render(self, task: "Task") -> RenderableType:
text = (
self.finished_text
if task.finished
else self.spinner.render(task.get_time())
)
return text
class TextColumn(ProgressColumn):
def __init__(
self,
text_format: str,
style: StyleType = "none",
justify: JustifyMethod = "left",
markup: bool = True,
highlighter: Optional[Highlighter] = None,
table_column: Optional[Column] = None,
) -> None:
self.text_format = text_format
self.justify: JustifyMethod = justify
self.style = style
self.markup = markup
self.highlighter = highlighter
super().__init__(table_column=table_column or Column(no_wrap=True))
def render(self, task: "Task") -> Text:
_text = self.text_format.format(task=task)
if self.markup:
text = Text.from_markup(_text, style=self.style, justify=self.justify)
else:
text = Text(_text, style=self.style, justify=self.justify)
if self.highlighter:
self.highlighter.highlight(text)
return text
class BarColumn(ProgressColumn):
def __init__(
self,
bar_width: Optional[int] = 40,
style: StyleType = "bar.back",
complete_style: StyleType = "bar.complete",
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
table_column: Optional[Column] = None,
) -> None:
self.bar_width = bar_width
self.style = style
self.complete_style = complete_style
self.finished_style = finished_style
self.pulse_style = pulse_style
super().__init__(table_column=table_column)
def render(self, task: "Task") -> ProgressBar:
return ProgressBar(
total=max(0, task.total) if task.total is not None else None,
completed=max(0, task.completed),
width=None if self.bar_width is None else max(1, self.bar_width),
pulse=not task.started,
animation_time=task.get_time(),
style=self.style,
complete_style=self.complete_style,
finished_style=self.finished_style,
pulse_style=self.pulse_style,
)
class TimeElapsedColumn(ProgressColumn):
def render(self, task: "Task") -> Text:
elapsed = task.finished_time if task.finished else task.elapsed
if elapsed is None:
return Text("-:--:--", style="progress.elapsed")
delta = timedelta(seconds=int(elapsed))
return Text(str(delta), style="progress.elapsed")
class TaskProgressColumn(TextColumn):
def __init__(
self,
text_format: str = "[progress.percentage]{task.percentage:>3.0f}%",
text_format_no_percentage: str = "",
style: StyleType = "none",
justify: JustifyMethod = "left",
markup: bool = True,
highlighter: Optional[Highlighter] = None,
table_column: Optional[Column] = None,
show_speed: bool = False,
) -> None:
self.text_format_no_percentage = text_format_no_percentage
self.show_speed = show_speed
super().__init__(
text_format=text_format,
style=style,
justify=justify,
markup=markup,
highlighter=highlighter,
table_column=table_column,
)
@classmethod
def render_speed(cls, speed: Optional[float]) -> Text:
if speed is None:
return Text("", style="progress.percentage")
unit, suffix = filesize.pick_unit_and_suffix(
int(speed),
["", "×10³", "×10⁶", "×10⁹", "×10¹²"],
1000,
)
data_speed = speed / unit
return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage")
def render(self, task: "Task") -> Text:
if task.total is None and self.show_speed:
return self.render_speed(task.finished_speed or task.speed)
text_format = (
self.text_format_no_percentage if task.total is None else self.text_format
)
_text = text_format.format(task=task)
if self.markup:
text = Text.from_markup(_text, style=self.style, justify=self.justify)
else:
text = Text(_text, style=self.style, justify=self.justify)
if self.highlighter:
self.highlighter.highlight(text)
return text
class TimeRemainingColumn(ProgressColumn):
max_refresh = 0.5
def __init__(
self,
compact: bool = False,
elapsed_when_finished: bool = False,
table_column: Optional[Column] = None,
):
self.compact = compact
self.elapsed_when_finished = elapsed_when_finished
super().__init__(table_column=table_column)
def render(self, task: "Task") -> Text:
if self.elapsed_when_finished and task.finished:
task_time = task.finished_time
style = "progress.elapsed"
else:
task_time = task.time_remaining
style = "progress.remaining"
if task.total is None:
return Text("", style=style)
if task_time is None:
return Text("--:--" if self.compact else "-:--:--", style=style)
minutes, seconds = divmod(int(task_time), 60)
hours, minutes = divmod(minutes, 60)
if self.compact and not hours:
formatted = f"{minutes:02d}:{seconds:02d}"
else:
formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}"
return Text(formatted, style=style)
class FileSizeColumn(ProgressColumn):
def render(self, task: "Task") -> Text:
data_size = filesize.decimal(int(task.completed))
return Text(data_size, style="progress.filesize")
class TotalFileSizeColumn(ProgressColumn):
def render(self, task: "Task") -> Text:
data_size = filesize.decimal(int(task.total)) if task.total is not None else ""
return Text(data_size, style="progress.filesize.total")
class MofNCompleteColumn(ProgressColumn):
def __init__(self, separator: str = "/", table_column: Optional[Column] = None):
self.separator = separator
super().__init__(table_column=table_column)
def render(self, task: "Task") -> Text:
completed = int(task.completed)
total = int(task.total) if task.total is not None else "?"
total_width = len(str(total))
return Text(
f"{completed:{total_width}d}{self.separator}{total}",
style="progress.download",
)
class DownloadColumn(ProgressColumn):
def __init__(
self, binary_units: bool = False, table_column: Optional[Column] = None
) -> None:
self.binary_units = binary_units
super().__init__(table_column=table_column)
def render(self, task: "Task") -> Text:
completed = int(task.completed)
unit_and_suffix_calculation_base = (
int(task.total) if task.total is not None else completed
)
if self.binary_units:
unit, suffix = filesize.pick_unit_and_suffix(
unit_and_suffix_calculation_base,
["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"],
1024,
)
else:
unit, suffix = filesize.pick_unit_and_suffix(
unit_and_suffix_calculation_base,
["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"],
1000,
)
precision = 0 if unit == 1 else 1
completed_ratio = completed / unit
completed_str = f"{completed_ratio:,.{precision}f}"
if task.total is not None:
total = int(task.total)
total_ratio = total / unit
total_str = f"{total_ratio:,.{precision}f}"
else:
total_str = "?"
download_status = f"{completed_str}/{total_str} {suffix}"
download_text = Text(download_status, style="progress.download")
return download_text
class TransferSpeedColumn(ProgressColumn):
def render(self, task: "Task") -> Text:
speed = task.finished_speed or task.speed
if speed is None:
return Text("?", style="progress.data.speed")
data_speed = filesize.decimal(int(speed))
return Text(f"{data_speed}/s", style="progress.data.speed")
class ProgressSample(NamedTuple):
timestamp: float
completed: float
@dataclass
class Task:
id: TaskID
description: str
total: Optional[float]
completed: float
_get_time: GetTimeCallable
finished_time: Optional[float] = None
visible: bool = True
fields: Dict[str, Any] = field(default_factory=dict)
start_time: Optional[float] = field(default=None, init=False, repr=False)
stop_time: Optional[float] = field(default=None, init=False, repr=False)
finished_speed: Optional[float] = None
_progress: Deque[ProgressSample] = field(
default_factory=lambda: deque(maxlen=1000), init=False, repr=False
)
_lock: RLock = field(repr=False, default_factory=RLock)
def get_time(self) -> float:
return self._get_time()
@property
def started(self) -> bool:
return self.start_time is not None
@property
def remaining(self) -> Optional[float]:
if self.total is None:
return None
return self.total - self.completed
@property
def elapsed(self) -> Optional[float]:
if self.start_time is None:
return None
if self.stop_time is not None:
return self.stop_time - self.start_time
return self.get_time() - self.start_time
@property
def finished(self) -> bool:
return self.finished_time is not None
@property
def percentage(self) -> float:
if not self.total:
return 0.0
completed = (self.completed / self.total) * 100.0
completed = min(100.0, max(0.0, completed))
return completed
@property
def speed(self) -> Optional[float]:
if self.start_time is None:
return None
with self._lock:
progress = self._progress
if not progress:
return None
total_time = progress[-1].timestamp - progress[0].timestamp
if total_time == 0:
return None
iter_progress = iter(progress)
next(iter_progress)
total_completed = sum(sample.completed for sample in iter_progress)
speed = total_completed / total_time
return speed
@property
def time_remaining(self) -> Optional[float]:
if self.finished:
return 0.0
speed = self.speed
if not speed:
return None
remaining = self.remaining
if remaining is None:
return None
estimate = ceil(remaining / speed)
return estimate
def _reset(self) -> None:
self._progress.clear()
self.finished_time = None
self.finished_speed = None
class Progress(JupyterMixin):
def __init__(
self,
*columns: Union[str, ProgressColumn],
console: Optional[Console] = None,
auto_refresh: bool = True,
refresh_per_second: float = 10,
speed_estimate_period: float = 30.0,
transient: bool = False,
redirect_stdout: bool = True,
redirect_stderr: bool = True,
get_time: Optional[GetTimeCallable] = None,
disable: bool = False,
expand: bool = False,
) -> None:
assert refresh_per_second > 0, "refresh_per_second must be > 0"
self._lock = RLock()
self.columns = columns or self.get_default_columns()
self.speed_estimate_period = speed_estimate_period
self.disable = disable
self.expand = expand
self._tasks: Dict[TaskID, Task] = {}
self._task_index: TaskID = TaskID(0)
self.live = Live(
console=console or get_console(),
auto_refresh=auto_refresh,
refresh_per_second=refresh_per_second,
transient=transient,
redirect_stdout=redirect_stdout,
redirect_stderr=redirect_stderr,
get_renderable=self.get_renderable,
)
self.get_time = get_time or self.console.get_time
self.print = self.console.print
self.log = self.console.log
@classmethod
def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
return (
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeRemainingColumn(),
)
@property
def console(self) -> Console:
return self.live.console
@property
def tasks(self) -> List[Task]:
with self._lock:
return list(self._tasks.values())
@property
def task_ids(self) -> List[TaskID]:
with self._lock:
return list(self._tasks.keys())
@property
def finished(self) -> bool:
with self._lock:
if not self._tasks:
return True
return all(task.finished for task in self._tasks.values())
def start(self) -> None:
if not self.disable:
self.live.start(refresh=True)
def stop(self) -> None:
self.live.stop()
if not self.console.is_interactive:
self.console.print()
def __enter__(self) -> "Progress":
self.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.stop()
def track(
self,
sequence: Union[Iterable[ProgressType], Sequence[ProgressType]],
total: Optional[float] = None,
task_id: Optional[TaskID] = None,
description: str = "Working...",
update_period: float = 0.1,
) -> Iterable[ProgressType]:
if total is None:
total = float(length_hint(sequence)) or None
if task_id is None:
task_id = self.add_task(description, total=total)
else:
self.update(task_id, total=total)
if self.live.auto_refresh:
with _TrackThread(self, task_id, update_period) as track_thread:
for value in sequence:
yield value
track_thread.completed += 1
else:
advance = self.advance
refresh = self.refresh
for value in sequence:
yield value
advance(task_id, 1)
refresh()
def wrap_file(
self,
file: BinaryIO,
total: Optional[int] = None,
*,
task_id: Optional[TaskID] = None,
description: str = "Reading...",
) -> BinaryIO:
total_bytes: Optional[float] = None
if total is not None:
total_bytes = total
elif task_id is not None:
with self._lock:
total_bytes = self._tasks[task_id].total
if total_bytes is None:
raise ValueError(
f"unable to get the total number of bytes, please specify 'total'"
)
if task_id is None:
task_id = self.add_task(description, total=total_bytes)
else:
self.update(task_id, total=total_bytes)
return _Reader(file, self, task_id, close_handle=False)
@typing.overload
def open(
self,
file: Union[str, "PathLike[str]", bytes],
mode: Literal["rb"],
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
*,
total: Optional[int] = None,
task_id: Optional[TaskID] = None,
description: str = "Reading...",
) -> BinaryIO:
pass
@typing.overload
def open(
self,
file: Union[str, "PathLike[str]", bytes],
mode: Union[Literal["r"], Literal["rt"]],
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
*,
total: Optional[int] = None,
task_id: Optional[TaskID] = None,
description: str = "Reading...",
) -> TextIO:
pass
def open(
self,
file: Union[str, "PathLike[str]", bytes],
mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
*,
total: Optional[int] = None,
task_id: Optional[TaskID] = None,
description: str = "Reading...",
) -> Union[BinaryIO, TextIO]:
_mode = "".join(sorted(mode, reverse=False))
if _mode not in ("br", "rt", "r"):
raise ValueError("invalid mode {!r}".format(mode))
line_buffering = buffering == 1
if _mode == "br" and buffering == 1:
warnings.warn(
"line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used",
RuntimeWarning,
)
buffering = -1
elif _mode in ("rt", "r"):
if buffering == 0:
raise ValueError("can't have unbuffered text I/O")
elif buffering == 1:
buffering = -1
if total is None:
total = stat(file).st_size
if task_id is None:
task_id = self.add_task(description, total=total)
else:
self.update(task_id, total=total)
handle = io.open(file, "rb", buffering=buffering)
reader = _Reader(handle, self, task_id, close_handle=True)
if mode in ("r", "rt"):
return io.TextIOWrapper(
reader,
encoding=encoding,
errors=errors,
newline=newline,
line_buffering=line_buffering,
)
return reader
def start_task(self, task_id: TaskID) -> None:
with self._lock:
task = self._tasks[task_id]
if task.start_time is None:
task.start_time = self.get_time()
def stop_task(self, task_id: TaskID) -> None:
with self._lock:
task = self._tasks[task_id]
current_time = self.get_time()
if task.start_time is None:
task.start_time = current_time
task.stop_time = current_time
def update(
self,
task_id: TaskID,
*,
total: Optional[float] = None,
completed: Optional[float] = None,
advance: Optional[float] = None,
description: Optional[str] = None,
visible: Optional[bool] = None,
refresh: bool = False,
**fields: Any,
) -> None:
with self._lock:
task = self._tasks[task_id]
completed_start = task.completed
if total is not None and total != task.total:
task.total = total
task._reset()
if advance is not None:
task.completed += advance
if completed is not None:
task.completed = completed
if description is not None:
task.description = description
if visible is not None:
task.visible = visible
task.fields.update(fields)
update_completed = task.completed - completed_start
current_time = self.get_time()
old_sample_time = current_time - self.speed_estimate_period
_progress = task._progress
popleft = _progress.popleft
while _progress and _progress[0].timestamp < old_sample_time:
popleft()
if update_completed > 0:
_progress.append(ProgressSample(current_time, update_completed))
if (
task.total is not None
and task.completed >= task.total
and task.finished_time is None
):
task.finished_time = task.elapsed
if refresh:
self.refresh()
def reset(
self,
task_id: TaskID,
*,
start: bool = True,
total: Optional[float] = None,
completed: int = 0,
visible: Optional[bool] = None,
description: Optional[str] = None,
**fields: Any,
) -> None:
current_time = self.get_time()
with self._lock:
task = self._tasks[task_id]
task._reset()
task.start_time = current_time if start else None
if total is not None:
task.total = total
task.completed = completed
if visible is not None:
task.visible = visible
if fields:
task.fields = fields
if description is not None:
task.description = description
task.finished_time = None
self.refresh()
def advance(self, task_id: TaskID, advance: float = 1) -> None:
current_time = self.get_time()
with self._lock:
task = self._tasks[task_id]
completed_start = task.completed
task.completed += advance
update_completed = task.completed - completed_start
old_sample_time = current_time - self.speed_estimate_period
_progress = task._progress
popleft = _progress.popleft
while _progress and _progress[0].timestamp < old_sample_time:
popleft()
while len(_progress) > 1000:
popleft()
_progress.append(ProgressSample(current_time, update_completed))
if (
task.total is not None
and task.completed >= task.total
and task.finished_time is None
):
task.finished_time = task.elapsed
task.finished_speed = task.speed
def refresh(self) -> None:
if not self.disable and self.live.is_started:
self.live.refresh()
def get_renderable(self) -> RenderableType:
renderable = Group(*self.get_renderables())
return renderable
def get_renderables(self) -> Iterable[RenderableType]:
table = self.make_tasks_table(self.tasks)
yield table
def make_tasks_table(self, tasks: Iterable[Task]) -> Table:
table_columns = (
(
Column(no_wrap=True)
if isinstance(_column, str)
else _column.get_table_column().copy()
)
for _column in self.columns
)
table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand)
for task in tasks:
if task.visible:
table.add_row(
*(
(
column.format(task=task)
if isinstance(column, str)
else column(task)
)
for column in self.columns
)
)
return table
def __rich__(self) -> RenderableType:
with self._lock:
return self.get_renderable()
def add_task(
self,
description: str,
start: bool = True,
total: Optional[float] = 100.0,
completed: int = 0,
visible: bool = True,
**fields: Any,
) -> TaskID:
with self._lock:
task = Task(
self._task_index,
description,
total,
completed,
visible=visible,
fields=fields,
_get_time=self.get_time,
_lock=self._lock,
)
self._tasks[self._task_index] = task
if start:
self.start_task(self._task_index)
new_task_index = self._task_index
self._task_index = TaskID(int(self._task_index) + 1)
self.refresh()
return new_task_index
def remove_task(self, task_id: TaskID) -> None:
with self._lock:
del self._tasks[task_id]
if __name__ == "__main__":
import random
import time
from .panel import Panel
from .rule import Rule
from .syntax import Syntax
from .table import Table
syntax = Syntax(
'''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
"""Iterate and generate a tuple with a flag for last value."""
iter_values = iter(values)
try:
previous_value = next(iter_values)
except StopIteration:
return
for value in iter_values:
yield False, previous_value
previous_value = value
yield True, previous_value''',
"python",
line_numbers=True,
)
table = Table("foo", "bar", "baz")
table.add_row("1", "2", "3")
progress_renderables = [
"Text may be printed while the progress bars are rendering.",
Panel("In fact, [i]any[/i] renderable will work"),
"Such as [magenta]tables[/]...",
table,
"Pretty printed structures...",
{"type": "example", "text": "Pretty printed"},
"Syntax...",
syntax,
Rule("Give it a try!"),
]
from itertools import cycle
examples = cycle(progress_renderables)
console = Console(record=True)
with Progress(
SpinnerColumn(),
*Progress.get_default_columns(),
TimeElapsedColumn(),
console=console,
transient=False,
) as progress:
task1 = progress.add_task("[red]Downloading", total=1000)
task2 = progress.add_task("[green]Processing", total=1000)
task3 = progress.add_task("[yellow]Thinking", total=None)
while not progress.finished:
progress.update(task1, advance=0.5)
progress.update(task2, advance=0.3)
time.sleep(0.01)
if random.randint(0, 100) < 1:
progress.log(next(examples))