import logging.handlers import pathlib import re import sys from typing import List, Tuple, Optional, Union from logging import LogRecord from datetime import datetime # This clearly never leads to confusion... from os import isatty from rich._log_render import LogRender from rich.containers import Renderables from rich.logging import RichHandler from rich.table import Table from rich.text import Text from rich.traceback import Traceback MAX_OLD_LOGS = 8 class RotatingFileHandler(logging.handlers.RotatingFileHandler): """Custom rotating file handler. This file handler rotates a bit differently to the one in stdlib. For a start, this works off of a "stem" and a "directory". The stem is the base name of the log file, without the extension. The directory is where all log files (including backups) will be placed. Secondly, this logger rotates files downwards, and new logs are *started* with the backup number incremented. The stdlib handler rotates files upwards, and this leaves the logs in reverse order. Thirdly, naming conventions are not customisable with this class. Logs will initially be named in the format "{stem}.log", and after rotating, the first log file will be renamed "{stem}-part1.log", and a new file "{stem}-part2.log" will be created for logging to continue. A few things can't be modified in this handler: it must use append mode, it doesn't support use of the `delay` arg, and it will ignore custom namers and rotators. When this handler is instantiated, it will search through the directory for logs from previous runtimes, and will open the file with the highest backup number to append to. """ def __init__( self, stem: str, directory: pathlib.Path, maxBytes: int = 0, backupCount: int = 0, encoding: Optional[str] = None, ) -> None: self.baseStem = stem self.directory = directory.resolve() # Scan for existing files in directory, append to last part of existing log log_part_re = re.compile(rf"{stem}-part(?P\d+).log") highest_part = 0 for path in directory.iterdir(): match = log_part_re.match(path.name) if match and int(match["partnum"]) > highest_part: highest_part = int(match["partnum"]) if highest_part: filename = directory / f"{stem}-part{highest_part}.log" else: filename = directory / f"{stem}.log" super().__init__( filename, mode="a", maxBytes=maxBytes, backupCount=backupCount, encoding=encoding, delay=False, ) def doRollover(self): if self.stream: self.stream.close() self.stream = None initial_path = self.directory / f"{self.baseStem}.log" if self.backupCount > 0 and initial_path.exists(): initial_path.replace(self.directory / f"{self.baseStem}-part1.log") match = re.match( rf"{self.baseStem}(?:-part(?P\d+)?)?.log", pathlib.Path(self.baseFilename).name ) latest_part_num = int(match.groupdict(default="1").get("part", "1")) if self.backupCount < 1: # No backups, just delete the existing log and start again pathlib.Path(self.baseFilename).unlink() elif latest_part_num > self.backupCount: # Rotate files down one # red-part2.log becomes red-part1.log etc, a new log is added at the end. for i in range(1, self.backupCount): next_log = self.directory / f"{self.baseStem}-part{i + 1}.log" if next_log.exists(): prev_log = self.directory / f"{self.baseStem}-part{i}.log" next_log.replace(prev_log) else: # Simply start a new file self.baseFilename = str( self.directory / f"{self.baseStem}-part{latest_part_num + 1}.log" ) self.stream = self._open() class RedLogRender(LogRender): def __call__( self, console, renderables, log_time=None, time_format=None, level="", path=None, line_no=None, link_path=None, logger_name=None, ): output = Table.grid(padding=(0, 1)) output.expand = True if self.show_time: output.add_column(style="log.time") if self.show_level: output.add_column(style="log.level", width=self.level_width) output.add_column(ratio=1, style="log.message", overflow="fold") if self.show_path and path: output.add_column(style="log.path") if logger_name: output.add_column() row = [] if self.show_time: log_time = log_time or console.get_datetime() log_time_display = log_time.strftime(time_format or self.time_format) if log_time_display == self._last_time: row.append(Text(" " * len(log_time_display))) else: row.append(Text(log_time_display)) self._last_time = log_time_display if self.show_level: row.append(level) row.append(Renderables(renderables)) if self.show_path and path: path_text = Text() path_text.append(path, style=f"link file://{link_path}" if link_path else "") if line_no: path_text.append(f":{line_no}") row.append(path_text) if logger_name: logger_name_text = Text() logger_name_text.append(f"[{logger_name}]") row.append(logger_name_text) output.add_row(*row) return output class RedRichHandler(RichHandler): """Adaptation of Rich's RichHandler to manually adjust the path to a logger name""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._log_render = RedLogRender( show_time=self._log_render.show_time, show_level=self._log_render.show_level, show_path=self._log_render.show_path, level_width=self._log_render.level_width, ) def emit(self, record: LogRecord) -> None: """Invoked by logging.""" path = pathlib.Path(record.pathname).name level = self.get_level_text(record) message = self.format(record) time_format = None if self.formatter is None else self.formatter.datefmt log_time = datetime.fromtimestamp(record.created) traceback = None if self.rich_tracebacks and record.exc_info and record.exc_info != (None, None, None): exc_type, exc_value, exc_traceback = record.exc_info assert exc_type is not None assert exc_value is not None traceback = Traceback.from_exception( exc_type, exc_value, exc_traceback, width=self.tracebacks_width, extra_lines=self.tracebacks_extra_lines, theme=self.tracebacks_theme, word_wrap=self.tracebacks_word_wrap, show_locals=self.tracebacks_show_locals, locals_max_length=self.locals_max_length, locals_max_string=self.locals_max_string, ) message = record.getMessage() use_markup = getattr(record, "markup") if hasattr(record, "markup") else self.markup if use_markup: message_text = Text.from_markup(message) else: message_text = Text(message) if self.highlighter: message_text = self.highlighter(message_text) if self.KEYWORDS: message_text.highlight_words(self.KEYWORDS, "logging.keyword") self.console.print( self._log_render( self.console, [message_text] if not traceback else [message_text, traceback], log_time=log_time, time_format=time_format, level=level, path=path, line_no=record.lineno, link_path=record.pathname if self.enable_link_path else None, logger_name=record.name, ) ) def init_logging( level: int, location: pathlib.Path, force_rich_logging: Union[bool, None] ) -> None: root_logger = logging.getLogger() base_logger = logging.getLogger("red") base_logger.setLevel(level) dpy_logger = logging.getLogger("discord") dpy_logger.setLevel(logging.WARNING) warnings_logger = logging.getLogger("py.warnings") warnings_logger.setLevel(logging.WARNING) enable_rich_logging = False if isatty(0) and force_rich_logging is None: # Check if the bot thinks it has a active terminal. enable_rich_logging = True elif force_rich_logging is True: enable_rich_logging = True file_formatter = logging.Formatter( "[{asctime}] [{levelname}] {name}: {message}", datefmt="%Y-%m-%d %H:%M:%S", style="{" ) if enable_rich_logging is True: rich_formatter = logging.Formatter("{message}", datefmt="[%X]", style="{") stdout_handler = RedRichHandler(rich_tracebacks=True, show_path=False) stdout_handler.setFormatter(rich_formatter) else: stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(file_formatter) root_logger.addHandler(stdout_handler) logging.captureWarnings(True) if not location.exists(): location.mkdir(parents=True, exist_ok=True) # Rotate latest logs to previous logs previous_logs: List[pathlib.Path] = [] latest_logs: List[Tuple[pathlib.Path, str]] = [] for path in location.iterdir(): match = re.match(r"latest(?P-part\d+)?\.log", path.name) if match: part = match.groupdict(default="")["part"] latest_logs.append((path, part)) match = re.match(r"previous(?:-part\d+)?.log", path.name) if match: previous_logs.append(path) # Delete all previous.log files for path in previous_logs: path.unlink() # Rename latest.log files to previous.log for path, part in latest_logs: path.replace(location / f"previous{part}.log") latest_fhandler = RotatingFileHandler( stem="latest", directory=location, maxBytes=1_000_000, # About 1MB per logfile backupCount=MAX_OLD_LOGS, encoding="utf-8", ) all_fhandler = RotatingFileHandler( stem="red", directory=location, maxBytes=1_000_000, backupCount=MAX_OLD_LOGS, encoding="utf-8", ) for fhandler in (latest_fhandler, all_fhandler): fhandler.setFormatter(file_formatter) root_logger.addHandler(fhandler)