import logging
from datetime import datetime
from logging import Handler, LogRecord
from pathlib import Path
from types import ModuleType
from typing import ClassVar, Iterable, List, Optional, Type, Union
from pip._vendor.rich._null_file import NullFile
from . import get_console
from ._log_render import FormatTimeCallable, LogRender
from .console import Console, ConsoleRenderable
from .highlighter import Highlighter, ReprHighlighter
from .text import Text
from .traceback import Traceback
class RichHandler(Handler):
KEYWORDS: ClassVar[Optional[List[str]]] = [
"GET",
"POST",
"HEAD",
"PUT",
"DELETE",
"OPTIONS",
"TRACE",
"PATCH",
]
HIGHLIGHTER_CLASS: ClassVar[Type[Highlighter]] = ReprHighlighter
def __init__(
self,
level: Union[int, str] = logging.NOTSET,
console: Optional[Console] = None,
*,
show_time: bool = True,
omit_repeated_times: bool = True,
show_level: bool = True,
show_path: bool = True,
enable_link_path: bool = True,
highlighter: Optional[Highlighter] = None,
markup: bool = False,
rich_tracebacks: bool = False,
tracebacks_width: Optional[int] = None,
tracebacks_extra_lines: int = 3,
tracebacks_theme: Optional[str] = None,
tracebacks_word_wrap: bool = True,
tracebacks_show_locals: bool = False,
tracebacks_suppress: Iterable[Union[str, ModuleType]] = (),
locals_max_length: int = 10,
locals_max_string: int = 80,
log_time_format: Union[str, FormatTimeCallable] = "[%x %X]",
keywords: Optional[List[str]] = None,
) -> None:
super().__init__(level=level)
self.console = console or get_console()
self.highlighter = highlighter or self.HIGHLIGHTER_CLASS()
self._log_render = LogRender(
show_time=show_time,
show_level=show_level,
show_path=show_path,
time_format=log_time_format,
omit_repeated_times=omit_repeated_times,
level_width=None,
)
self.enable_link_path = enable_link_path
self.markup = markup
self.rich_tracebacks = rich_tracebacks
self.tracebacks_width = tracebacks_width
self.tracebacks_extra_lines = tracebacks_extra_lines
self.tracebacks_theme = tracebacks_theme
self.tracebacks_word_wrap = tracebacks_word_wrap
self.tracebacks_show_locals = tracebacks_show_locals
self.tracebacks_suppress = tracebacks_suppress
self.locals_max_length = locals_max_length
self.locals_max_string = locals_max_string
self.keywords = keywords
def get_level_text(self, record: LogRecord) -> Text:
level_name = record.levelname
level_text = Text.styled(
level_name.ljust(8), f"logging.level.{level_name.lower()}"
)
return level_text
def emit(self, record: LogRecord) -> None:
message = self.format(record)
traceback = None
if (
self.rich_tracebacks
and record.exc_info
and record.exc_info != (None, None, None)
):
exc_type, exc_value, exc_traceback = record.exc_info
assert exc_type is not None
assert exc_value is not None
traceback = Traceback.from_exception(
exc_type,
exc_value,
exc_traceback,
width=self.tracebacks_width,
extra_lines=self.tracebacks_extra_lines,
theme=self.tracebacks_theme,
word_wrap=self.tracebacks_word_wrap,
show_locals=self.tracebacks_show_locals,
locals_max_length=self.locals_max_length,
locals_max_string=self.locals_max_string,
suppress=self.tracebacks_suppress,
)
message = record.getMessage()
if self.formatter:
record.message = record.getMessage()
formatter = self.formatter
if hasattr(formatter, "usesTime") and formatter.usesTime():
record.asctime = formatter.formatTime(record, formatter.datefmt)
message = formatter.formatMessage(record)
message_renderable = self.render_message(record, message)
log_renderable = self.render(
record=record, traceback=traceback, message_renderable=message_renderable
)
if isinstance(self.console.file, NullFile):
self.handleError(record)
else:
try:
self.console.print(log_renderable)
except Exception:
self.handleError(record)
def render_message(self, record: LogRecord, message: str) -> "ConsoleRenderable":
use_markup = getattr(record, "markup", self.markup)
message_text = Text.from_markup(message) if use_markup else Text(message)
highlighter = getattr(record, "highlighter", self.highlighter)
if highlighter:
message_text = highlighter(message_text)
if self.keywords is None:
self.keywords = self.KEYWORDS
if self.keywords:
message_text.highlight_words(self.keywords, "logging.keyword")
return message_text
def render(
self,
*,
record: LogRecord,
traceback: Optional[Traceback],
message_renderable: "ConsoleRenderable",
) -> "ConsoleRenderable":
path = Path(record.pathname).name
level = self.get_level_text(record)
time_format = None if self.formatter is None else self.formatter.datefmt
log_time = datetime.fromtimestamp(record.created)
log_renderable = self._log_render(
self.console,
[message_renderable] if not traceback else [message_renderable, traceback],
log_time=log_time,
time_format=time_format,
level=level,
path=path,
line_no=record.lineno,
link_path=record.pathname if self.enable_link_path else None,
)
return log_renderable
if __name__ == "__main__": from time import sleep
FORMAT = "%(message)s"
logging.basicConfig(
level="NOTSET",
format=FORMAT,
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True, tracebacks_show_locals=True)],
)
log = logging.getLogger("rich")
log.info("Server starting...")
log.info("Listening on http://127.0.0.1:8080")
sleep(1)
log.info("GET /index.html 200 1298")
log.info("GET /imgs/backgrounds/back1.jpg 200 54386")
log.info("GET /css/styles.css 200 54386")
log.warning("GET /favicon.ico 404 242")
sleep(1)
log.debug(
"JSONRPC request\n--> %r\n<-- %r",
{
"version": "1.1",
"method": "confirmFruitPurchase",
"params": [["apple", "orange", "mangoes", "pomelo"], 1.123],
"id": "194521489",
},
{"version": "1.1", "result": True, "error": None, "id": "194521489"},
)
log.debug(
"Loading configuration file /adasd/asdasd/qeqwe/qwrqwrqwr/sdgsdgsdg/werwerwer/dfgerert/ertertert/ertetert/werwerwer"
)
log.error("Unable to find 'pomelo' in database!")
log.info("POST /jsonrpc/ 200 65532")
log.info("POST /admin/ 401 42234")
log.warning("password was rejected for admin site.")
def divide() -> None:
number = 1
divisor = 0
foos = ["foo"] * 100
log.debug("in divide")
try:
number / divisor
except:
log.exception("An error of some kind occurred!")
divide()
sleep(1)
log.critical("Out of memory!")
log.info("Server exited with code=-1")
log.info("[bold]EXITING...[/bold]", extra=dict(markup=True))