mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
* Pop stash * add rich to setup * Added forceful enabling of rich logging * revert some unintended pushed * Fix possible unbound var Fix possible 0 members w/out members intent * One day I won't forget to do style passes * So this is a thing apperently... * Bump rich to 9.5.1 * Lock secondary deps * Different stuff, see the full commit description for more info - Change few things from print to log.info - put the log handlers on the root logger instead of individual loggers - capture warnings to a logger * Modify log handler to show logger name * Add a Triboolian to force disable rich * Style checks * shortened time, added logger name... again. * aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa * Style & linking * Be or not to be? Whatever man, it's 4:30 in the morning, goto sleep >.< * Reintroduce outdated message. Co-authored-by: jack1142 <6032823+jack1142@users.noreply.github.com>
303 lines
11 KiB
Python
303 lines
11 KiB
Python
import logging.handlers
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
|
|
from typing import List, Tuple, Optional, Union
|
|
from logging import LogRecord
|
|
from datetime import datetime # This clearly never leads to confusion...
|
|
from os import isatty
|
|
|
|
from rich._log_render import LogRender
|
|
from rich.containers import Renderables
|
|
from rich.logging import RichHandler
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
from rich.traceback import Traceback
|
|
|
|
|
|
MAX_OLD_LOGS = 8
|
|
|
|
|
|
class RotatingFileHandler(logging.handlers.RotatingFileHandler):
|
|
"""Custom rotating file handler.
|
|
|
|
This file handler rotates a bit differently to the one in stdlib.
|
|
|
|
For a start, this works off of a "stem" and a "directory". The stem
|
|
is the base name of the log file, without the extension. The
|
|
directory is where all log files (including backups) will be placed.
|
|
|
|
Secondly, this logger rotates files downwards, and new logs are
|
|
*started* with the backup number incremented. The stdlib handler
|
|
rotates files upwards, and this leaves the logs in reverse order.
|
|
|
|
Thirdly, naming conventions are not customisable with this class.
|
|
Logs will initially be named in the format "{stem}.log", and after
|
|
rotating, the first log file will be renamed "{stem}-part1.log",
|
|
and a new file "{stem}-part2.log" will be created for logging to
|
|
continue.
|
|
|
|
A few things can't be modified in this handler: it must use append
|
|
mode, it doesn't support use of the `delay` arg, and it will ignore
|
|
custom namers and rotators.
|
|
|
|
When this handler is instantiated, it will search through the
|
|
directory for logs from previous runtimes, and will open the file
|
|
with the highest backup number to append to.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
stem: str,
|
|
directory: pathlib.Path,
|
|
maxBytes: int = 0,
|
|
backupCount: int = 0,
|
|
encoding: Optional[str] = None,
|
|
) -> None:
|
|
self.baseStem = stem
|
|
self.directory = directory.resolve()
|
|
# Scan for existing files in directory, append to last part of existing log
|
|
log_part_re = re.compile(rf"{stem}-part(?P<partnum>\d+).log")
|
|
highest_part = 0
|
|
for path in directory.iterdir():
|
|
match = log_part_re.match(path.name)
|
|
if match and int(match["partnum"]) > highest_part:
|
|
highest_part = int(match["partnum"])
|
|
if highest_part:
|
|
filename = directory / f"{stem}-part{highest_part}.log"
|
|
else:
|
|
filename = directory / f"{stem}.log"
|
|
super().__init__(
|
|
filename,
|
|
mode="a",
|
|
maxBytes=maxBytes,
|
|
backupCount=backupCount,
|
|
encoding=encoding,
|
|
delay=False,
|
|
)
|
|
|
|
def doRollover(self):
|
|
if self.stream:
|
|
self.stream.close()
|
|
self.stream = None
|
|
initial_path = self.directory / f"{self.baseStem}.log"
|
|
if self.backupCount > 0 and initial_path.exists():
|
|
initial_path.replace(self.directory / f"{self.baseStem}-part1.log")
|
|
|
|
match = re.match(
|
|
rf"{self.baseStem}(?:-part(?P<part>\d+)?)?.log", pathlib.Path(self.baseFilename).name
|
|
)
|
|
latest_part_num = int(match.groupdict(default="1").get("part", "1"))
|
|
if self.backupCount < 1:
|
|
# No backups, just delete the existing log and start again
|
|
pathlib.Path(self.baseFilename).unlink()
|
|
elif latest_part_num > self.backupCount:
|
|
# Rotate files down one
|
|
# red-part2.log becomes red-part1.log etc, a new log is added at the end.
|
|
for i in range(1, self.backupCount):
|
|
next_log = self.directory / f"{self.baseStem}-part{i + 1}.log"
|
|
if next_log.exists():
|
|
prev_log = self.directory / f"{self.baseStem}-part{i}.log"
|
|
next_log.replace(prev_log)
|
|
else:
|
|
# Simply start a new file
|
|
self.baseFilename = str(
|
|
self.directory / f"{self.baseStem}-part{latest_part_num + 1}.log"
|
|
)
|
|
|
|
self.stream = self._open()
|
|
|
|
|
|
class RedLogRender(LogRender):
|
|
def __call__(
|
|
self,
|
|
console,
|
|
renderables,
|
|
log_time=None,
|
|
time_format=None,
|
|
level="",
|
|
path=None,
|
|
line_no=None,
|
|
link_path=None,
|
|
logger_name=None,
|
|
):
|
|
output = Table.grid(padding=(0, 1))
|
|
output.expand = True
|
|
if self.show_time:
|
|
output.add_column(style="log.time")
|
|
if self.show_level:
|
|
output.add_column(style="log.level", width=self.level_width)
|
|
output.add_column(ratio=1, style="log.message", overflow="fold")
|
|
if self.show_path and path:
|
|
output.add_column(style="log.path")
|
|
if logger_name:
|
|
output.add_column()
|
|
row = []
|
|
if self.show_time:
|
|
log_time = log_time or console.get_datetime()
|
|
log_time_display = log_time.strftime(time_format or self.time_format)
|
|
if log_time_display == self._last_time:
|
|
row.append(Text(" " * len(log_time_display)))
|
|
else:
|
|
row.append(Text(log_time_display))
|
|
self._last_time = log_time_display
|
|
if self.show_level:
|
|
row.append(level)
|
|
|
|
row.append(Renderables(renderables))
|
|
if self.show_path and path:
|
|
path_text = Text()
|
|
path_text.append(path, style=f"link file://{link_path}" if link_path else "")
|
|
if line_no:
|
|
path_text.append(f":{line_no}")
|
|
row.append(path_text)
|
|
|
|
if logger_name:
|
|
logger_name_text = Text()
|
|
logger_name_text.append(f"[{logger_name}]")
|
|
row.append(logger_name_text)
|
|
|
|
output.add_row(*row)
|
|
return output
|
|
|
|
|
|
class RedRichHandler(RichHandler):
|
|
"""Adaptation of Rich's RichHandler to manually adjust the path to a logger name"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._log_render = RedLogRender(
|
|
show_time=self._log_render.show_time,
|
|
show_level=self._log_render.show_level,
|
|
show_path=self._log_render.show_path,
|
|
level_width=self._log_render.level_width,
|
|
)
|
|
|
|
def emit(self, record: LogRecord) -> None:
|
|
"""Invoked by logging."""
|
|
path = pathlib.Path(record.pathname).name
|
|
level = self.get_level_text(record)
|
|
message = self.format(record)
|
|
time_format = None if self.formatter is None else self.formatter.datefmt
|
|
log_time = datetime.fromtimestamp(record.created)
|
|
|
|
traceback = None
|
|
if self.rich_tracebacks and record.exc_info and record.exc_info != (None, None, None):
|
|
exc_type, exc_value, exc_traceback = record.exc_info
|
|
assert exc_type is not None
|
|
assert exc_value is not None
|
|
traceback = Traceback.from_exception(
|
|
exc_type,
|
|
exc_value,
|
|
exc_traceback,
|
|
width=self.tracebacks_width,
|
|
extra_lines=self.tracebacks_extra_lines,
|
|
theme=self.tracebacks_theme,
|
|
word_wrap=self.tracebacks_word_wrap,
|
|
show_locals=self.tracebacks_show_locals,
|
|
locals_max_length=self.locals_max_length,
|
|
locals_max_string=self.locals_max_string,
|
|
)
|
|
message = record.getMessage()
|
|
|
|
use_markup = getattr(record, "markup") if hasattr(record, "markup") else self.markup
|
|
if use_markup:
|
|
message_text = Text.from_markup(message)
|
|
else:
|
|
message_text = Text(message)
|
|
|
|
if self.highlighter:
|
|
message_text = self.highlighter(message_text)
|
|
if self.KEYWORDS:
|
|
message_text.highlight_words(self.KEYWORDS, "logging.keyword")
|
|
|
|
self.console.print(
|
|
self._log_render(
|
|
self.console,
|
|
[message_text] if not traceback else [message_text, traceback],
|
|
log_time=log_time,
|
|
time_format=time_format,
|
|
level=level,
|
|
path=path,
|
|
line_no=record.lineno,
|
|
link_path=record.pathname if self.enable_link_path else None,
|
|
logger_name=record.name,
|
|
)
|
|
)
|
|
|
|
|
|
def init_logging(
|
|
level: int, location: pathlib.Path, force_rich_logging: Union[bool, None]
|
|
) -> None:
|
|
root_logger = logging.getLogger()
|
|
|
|
base_logger = logging.getLogger("red")
|
|
base_logger.setLevel(level)
|
|
dpy_logger = logging.getLogger("discord")
|
|
dpy_logger.setLevel(logging.WARNING)
|
|
warnings_logger = logging.getLogger("py.warnings")
|
|
warnings_logger.setLevel(logging.WARNING)
|
|
|
|
enable_rich_logging = False
|
|
|
|
if isatty(0) and force_rich_logging is None:
|
|
# Check if the bot thinks it has a active terminal.
|
|
enable_rich_logging = True
|
|
elif force_rich_logging is True:
|
|
enable_rich_logging = True
|
|
|
|
file_formatter = logging.Formatter(
|
|
"[{asctime}] [{levelname}] {name}: {message}", datefmt="%Y-%m-%d %H:%M:%S", style="{"
|
|
)
|
|
if enable_rich_logging is True:
|
|
rich_formatter = logging.Formatter("{message}", datefmt="[%X]", style="{")
|
|
|
|
stdout_handler = RedRichHandler(rich_tracebacks=True, show_path=False)
|
|
stdout_handler.setFormatter(rich_formatter)
|
|
else:
|
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
|
stdout_handler.setFormatter(file_formatter)
|
|
|
|
root_logger.addHandler(stdout_handler)
|
|
logging.captureWarnings(True)
|
|
|
|
if not location.exists():
|
|
location.mkdir(parents=True, exist_ok=True)
|
|
# Rotate latest logs to previous logs
|
|
previous_logs: List[pathlib.Path] = []
|
|
latest_logs: List[Tuple[pathlib.Path, str]] = []
|
|
for path in location.iterdir():
|
|
match = re.match(r"latest(?P<part>-part\d+)?\.log", path.name)
|
|
if match:
|
|
part = match.groupdict(default="")["part"]
|
|
latest_logs.append((path, part))
|
|
match = re.match(r"previous(?:-part\d+)?.log", path.name)
|
|
if match:
|
|
previous_logs.append(path)
|
|
# Delete all previous.log files
|
|
for path in previous_logs:
|
|
path.unlink()
|
|
# Rename latest.log files to previous.log
|
|
for path, part in latest_logs:
|
|
path.replace(location / f"previous{part}.log")
|
|
|
|
latest_fhandler = RotatingFileHandler(
|
|
stem="latest",
|
|
directory=location,
|
|
maxBytes=1_000_000, # About 1MB per logfile
|
|
backupCount=MAX_OLD_LOGS,
|
|
encoding="utf-8",
|
|
)
|
|
all_fhandler = RotatingFileHandler(
|
|
stem="red",
|
|
directory=location,
|
|
maxBytes=1_000_000,
|
|
backupCount=MAX_OLD_LOGS,
|
|
encoding="utf-8",
|
|
)
|
|
|
|
for fhandler in (latest_fhandler, all_fhandler):
|
|
fhandler.setFormatter(file_formatter)
|
|
root_logger.addHandler(fhandler)
|