mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
Show source code in tracebacks sent by Dev cog commands (#5843)
Co-authored-by: Kreusada <67752638+Kreusada@users.noreply.github.com>
This commit is contained in:
parent
aa51fd9ad1
commit
2c4bd38ba1
2
.github/labeler.yml
vendored
2
.github/labeler.yml
vendored
@ -41,6 +41,8 @@
|
||||
- redbot/core/dev_commands.py
|
||||
# Docs
|
||||
- docs/cog_guides/dev.rst
|
||||
# Tests
|
||||
- tests/core/test_dev_commands.py
|
||||
"Category: Cogs - Downloader":
|
||||
# Source
|
||||
- redbot/cogs/downloader/*
|
||||
|
||||
@ -10,6 +10,8 @@ The original copy was distributed under MIT License and this derivative work
|
||||
is distributed under GNU GPL Version 3.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
import aiohttp
|
||||
@ -19,8 +21,10 @@ import textwrap
|
||||
import traceback
|
||||
import types
|
||||
import re
|
||||
from contextlib import redirect_stdout
|
||||
import sys
|
||||
from copy import copy
|
||||
from typing import Any, Awaitable, Dict, Iterator, Literal, Type, TypeVar, Union
|
||||
from types import CodeType, TracebackType
|
||||
|
||||
import discord
|
||||
|
||||
@ -33,72 +37,297 @@ from .utils.predicates import MessagePredicate
|
||||
|
||||
_ = Translator("Dev", __file__)
|
||||
|
||||
START_CODE_BLOCK_RE = re.compile(r"^((```py(thon)?)(?=\s)|(```))")
|
||||
# we want to match either:
|
||||
# - "```lang\n"
|
||||
# - or "```" and potentially also strip a single "\n" if it follows it immediately
|
||||
START_CODE_BLOCK_RE = re.compile(r"^((```[\w.+\-]+\n+(?!```))|(```\n*))")
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def get_pages(msg: str) -> Iterator[str]:
|
||||
"""Pagify the given message for output to the user."""
|
||||
return pagify(msg, delims=["\n", " "], priority=True, shorten_by=10)
|
||||
|
||||
|
||||
def sanitize_output(ctx: commands.Context, to_sanitize: str) -> str:
|
||||
"""Hides the bot's token from a string."""
|
||||
token = ctx.bot.http.token
|
||||
if token:
|
||||
return re.sub(re.escape(token), "[EXPUNGED]", to_sanitize, re.I)
|
||||
return to_sanitize
|
||||
|
||||
|
||||
def async_compile(source: str, filename: str, mode: Literal["eval", "exec"]) -> CodeType:
|
||||
return compile(
|
||||
source, filename, mode, flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, optimize=0, dont_inherit=True
|
||||
)
|
||||
|
||||
|
||||
async def maybe_await(coro: Union[T, Awaitable[T], Awaitable[Awaitable[T]]]) -> T:
|
||||
for i in range(2):
|
||||
if inspect.isawaitable(coro):
|
||||
coro = await coro
|
||||
else:
|
||||
break
|
||||
return coro # type: ignore
|
||||
|
||||
|
||||
def cleanup_code(content: str) -> str:
|
||||
"""Automatically removes code blocks from the code."""
|
||||
# remove ```py\n```
|
||||
if content.startswith("```") and content.endswith("```"):
|
||||
return START_CODE_BLOCK_RE.sub("", content)[:-3].rstrip("\n")
|
||||
|
||||
# remove `foo`
|
||||
return content.strip("` \n")
|
||||
|
||||
|
||||
class DevOutput:
|
||||
def __init__(
|
||||
self, ctx: commands.Context, *, source: str, filename: str, env: Dict[str, Any]
|
||||
) -> None:
|
||||
self.ctx = ctx
|
||||
self.source = source
|
||||
self.filename = filename
|
||||
self.env = env
|
||||
self.always_include_result = False
|
||||
self._stream = io.StringIO()
|
||||
self.formatted_exc = ""
|
||||
self.result: Any = None
|
||||
self._old_streams = []
|
||||
|
||||
@property
|
||||
def source(self) -> str:
|
||||
return self._original_source
|
||||
|
||||
@source.setter
|
||||
def source(self, value: str) -> None:
|
||||
self._source = self._original_source = value
|
||||
|
||||
def __str__(self) -> str:
|
||||
output = []
|
||||
printed = self._stream.getvalue()
|
||||
if printed:
|
||||
output.append(printed)
|
||||
if self.formatted_exc:
|
||||
output.append(self.formatted_exc)
|
||||
elif self.always_include_result or self.result is not None:
|
||||
try:
|
||||
output.append(str(self.result))
|
||||
except Exception as exc:
|
||||
output.append(self.format_exception(exc))
|
||||
return sanitize_output(self.ctx, "".join(output))
|
||||
|
||||
async def send(self, *, tick: bool = True) -> None:
|
||||
await self.ctx.send_interactive(get_pages(str(self)), box_lang="py")
|
||||
if tick and not self.formatted_exc:
|
||||
await self.ctx.tick()
|
||||
|
||||
def set_exception(self, exc: Exception, *, line_offset: int = 0, skip_frames: int = 1) -> None:
|
||||
self.formatted_exc = self.format_exception(
|
||||
exc, line_offset=line_offset, skip_frames=skip_frames
|
||||
)
|
||||
|
||||
def __enter__(self) -> None:
|
||||
self._old_streams.append(sys.stdout)
|
||||
sys.stdout = self._stream
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_value: Optional[BaseException],
|
||||
exc_tb: Optional[TracebackType],
|
||||
/,
|
||||
) -> None:
|
||||
sys.stdout = self._old_streams.pop()
|
||||
|
||||
@classmethod
|
||||
async def from_debug(
|
||||
cls, ctx: commands.Context, *, source: str, env: Dict[str, Any]
|
||||
) -> DevOutput:
|
||||
output = cls(ctx, source=source, filename="<debug command>", env=env)
|
||||
await output.run_debug()
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
async def from_eval(
|
||||
cls, ctx: commands.Context, *, source: str, env: Dict[str, Any]
|
||||
) -> DevOutput:
|
||||
output = cls(ctx, source=source, filename="<eval command>", env=env)
|
||||
await output.run_eval()
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
async def from_repl(
|
||||
cls, ctx: commands.Context, *, source: str, env: Dict[str, Any]
|
||||
) -> DevOutput:
|
||||
output = cls(ctx, source=source, filename="<repl session>", env=env)
|
||||
await output.run_repl()
|
||||
return output
|
||||
|
||||
async def run_debug(self) -> None:
|
||||
self.always_include_result = True
|
||||
self._source = self.source
|
||||
try:
|
||||
compiled = self.async_compile_with_eval()
|
||||
except SyntaxError as exc:
|
||||
self.set_exception(exc, skip_frames=3)
|
||||
return
|
||||
|
||||
try:
|
||||
self.result = await maybe_await(eval(compiled, self.env))
|
||||
except Exception as exc:
|
||||
self.set_exception(exc)
|
||||
|
||||
async def run_eval(self) -> None:
|
||||
self.always_include_result = False
|
||||
self._source = "async def func():\n%s" % textwrap.indent(self.source, " ")
|
||||
try:
|
||||
compiled = self.async_compile_with_exec()
|
||||
exec(compiled, self.env)
|
||||
except SyntaxError as exc:
|
||||
self.set_exception(exc, line_offset=1, skip_frames=3)
|
||||
return
|
||||
|
||||
func = self.env["func"]
|
||||
try:
|
||||
with self:
|
||||
self.result = await func()
|
||||
except Exception as exc:
|
||||
self.set_exception(exc, line_offset=1)
|
||||
|
||||
async def run_repl(self) -> None:
|
||||
self.always_include_result = False
|
||||
self._source = self.source
|
||||
executor = None
|
||||
if self.source.count("\n") == 0:
|
||||
# single statement, potentially 'eval'
|
||||
try:
|
||||
code = self.async_compile_with_eval()
|
||||
except SyntaxError:
|
||||
pass
|
||||
else:
|
||||
executor = eval
|
||||
|
||||
if executor is None:
|
||||
try:
|
||||
code = self.async_compile_with_exec()
|
||||
except SyntaxError as exc:
|
||||
self.set_exception(exc, skip_frames=3)
|
||||
return
|
||||
|
||||
try:
|
||||
with self:
|
||||
if executor is None:
|
||||
result = types.FunctionType(code, self.env)()
|
||||
else:
|
||||
result = executor(code, self.env)
|
||||
self.result = await maybe_await(result)
|
||||
except Exception as exc:
|
||||
self.set_exception(exc)
|
||||
else:
|
||||
if self.result is not None:
|
||||
self.env["_"] = self.result
|
||||
|
||||
def async_compile_with_exec(self) -> CodeType:
|
||||
return async_compile(self._source, self.filename, "exec")
|
||||
|
||||
def async_compile_with_eval(self) -> CodeType:
|
||||
return async_compile(self._source, self.filename, "eval")
|
||||
|
||||
def format_exception(
|
||||
self, exc: Exception, *, line_offset: int = 0, skip_frames: int = 1
|
||||
) -> str:
|
||||
"""
|
||||
Format an exception to send to the user.
|
||||
|
||||
This function makes a few alterations to the traceback:
|
||||
- First `skip_frames` frames are skipped so that we don't show the frames
|
||||
that are part of Red's code to the user
|
||||
- `FrameSummary` objects that we get from traceback module are updated
|
||||
with the string for the corresponding line of code as otherwise
|
||||
the generated traceback string wouldn't show user's code.
|
||||
- If `line_offset` is passed, this function subtracts it from line numbers
|
||||
in `FrameSummary` objects so that those numbers properly correspond to
|
||||
the code that was provided by the user. This is needed for cases where
|
||||
we wrap user's code in an async function before exec-ing it.
|
||||
"""
|
||||
exc_type = type(exc)
|
||||
tb = exc.__traceback__
|
||||
for x in range(skip_frames):
|
||||
if tb is None:
|
||||
break
|
||||
tb = tb.tb_next
|
||||
|
||||
# To mimic linecache module's behavior,
|
||||
# all lines (including the last one) should end with \n.
|
||||
source_lines = [f"{line}\n" for line in self._source.splitlines()]
|
||||
filename = self.filename
|
||||
# sometimes SyntaxError.text is None, sometimes it isn't
|
||||
if (
|
||||
issubclass(exc_type, SyntaxError)
|
||||
and exc.filename == filename
|
||||
and exc.lineno is not None
|
||||
):
|
||||
if exc.text is None:
|
||||
# line numbers are 1-based, the list indexes are 0-based
|
||||
exc.text = source_lines[exc.lineno - 1]
|
||||
exc.lineno -= line_offset
|
||||
|
||||
traceback_exc = traceback.TracebackException(exc_type, exc, tb)
|
||||
py311_or_above = sys.version_info >= (3, 11)
|
||||
stack_summary = traceback_exc.stack
|
||||
for idx, frame_summary in enumerate(stack_summary):
|
||||
if frame_summary.filename != filename:
|
||||
continue
|
||||
lineno = frame_summary.lineno
|
||||
if lineno is None:
|
||||
continue
|
||||
|
||||
# line numbers are 1-based, the list indexes are 0-based
|
||||
line = source_lines[lineno - 1]
|
||||
lineno -= line_offset
|
||||
# support for enhanced error locations in tracebacks
|
||||
if py311_or_above:
|
||||
end_lineno = frame_summary.end_lineno
|
||||
if end_lineno is not None:
|
||||
end_lineno -= line_offset
|
||||
frame_summary = traceback.FrameSummary(
|
||||
frame_summary.filename,
|
||||
lineno,
|
||||
frame_summary.name,
|
||||
line=line,
|
||||
end_lineno=end_lineno,
|
||||
colno=frame_summary.colno,
|
||||
end_colno=frame_summary.end_colno,
|
||||
)
|
||||
else:
|
||||
frame_summary = traceback.FrameSummary(
|
||||
frame_summary.filename, lineno, frame_summary.name, line=line
|
||||
)
|
||||
stack_summary[idx] = frame_summary
|
||||
|
||||
return "".join(traceback_exc.format())
|
||||
|
||||
|
||||
@cog_i18n(_)
|
||||
class Dev(commands.Cog):
|
||||
"""Various development focused utilities."""
|
||||
|
||||
async def red_delete_data_for_user(self, **kwargs):
|
||||
async def red_delete_data_for_user(self, **kwargs: Any) -> None:
|
||||
"""
|
||||
Because despite my best efforts to advise otherwise,
|
||||
people use ``--dev`` in production
|
||||
"""
|
||||
return
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._last_result = None
|
||||
self.sessions = {}
|
||||
self.env_extensions = {}
|
||||
|
||||
@staticmethod
|
||||
def async_compile(source, filename, mode):
|
||||
return compile(source, filename, mode, flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, optimize=0)
|
||||
|
||||
@staticmethod
|
||||
async def maybe_await(coro):
|
||||
for i in range(2):
|
||||
if inspect.isawaitable(coro):
|
||||
coro = await coro
|
||||
else:
|
||||
return coro
|
||||
return coro
|
||||
|
||||
@staticmethod
|
||||
def cleanup_code(content):
|
||||
"""Automatically removes code blocks from the code."""
|
||||
# remove ```py\n```
|
||||
if content.startswith("```") and content.endswith("```"):
|
||||
return START_CODE_BLOCK_RE.sub("", content)[:-3]
|
||||
|
||||
# remove `foo`
|
||||
return content.strip("` \n")
|
||||
|
||||
@classmethod
|
||||
def get_syntax_error(cls, e):
|
||||
"""Format a syntax error to send to the user.
|
||||
|
||||
Returns a string representation of the error formatted as a codeblock.
|
||||
"""
|
||||
if e.text is None:
|
||||
return cls.get_pages("{0.__class__.__name__}: {0}".format(e))
|
||||
return cls.get_pages(
|
||||
"{0.text}\n{1:>{0.offset}}\n{2}: {0}".format(e, "^", type(e).__name__)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_pages(msg: str):
|
||||
"""Pagify the given message for output to the user."""
|
||||
return pagify(msg, delims=["\n", " "], priority=True, shorten_by=10)
|
||||
|
||||
@staticmethod
|
||||
def sanitize_output(ctx: commands.Context, input_: str) -> str:
|
||||
"""Hides the bot's token from a string."""
|
||||
token = ctx.bot.http.token
|
||||
return re.sub(re.escape(token), "[EXPUNGED]", input_, re.I)
|
||||
|
||||
def get_environment(self, ctx: commands.Context) -> dict:
|
||||
env = {
|
||||
"bot": ctx.bot,
|
||||
@ -118,14 +347,14 @@ class Dev(commands.Cog):
|
||||
for name, value in self.env_extensions.items():
|
||||
try:
|
||||
env[name] = value(ctx)
|
||||
except Exception as e:
|
||||
traceback.clear_frames(e.__traceback__)
|
||||
env[name] = e
|
||||
except Exception as exc:
|
||||
traceback.clear_frames(exc.__traceback__)
|
||||
env[name] = exc
|
||||
return env
|
||||
|
||||
@commands.command()
|
||||
@commands.is_owner()
|
||||
async def debug(self, ctx, *, code):
|
||||
async def debug(self, ctx: commands.Context, *, code: str) -> None:
|
||||
"""Evaluate a statement of python code.
|
||||
|
||||
The bot will always respond with the return value of the code.
|
||||
@ -151,29 +380,15 @@ class Dev(commands.Cog):
|
||||
`cf` - the redbot.core.utils.chat_formatting module
|
||||
"""
|
||||
env = self.get_environment(ctx)
|
||||
code = self.cleanup_code(code)
|
||||
source = cleanup_code(code)
|
||||
|
||||
try:
|
||||
compiled = self.async_compile(code, "<string>", "eval")
|
||||
result = await self.maybe_await(eval(compiled, env))
|
||||
except SyntaxError as e:
|
||||
await ctx.send_interactive(self.get_syntax_error(e), box_lang="py")
|
||||
return
|
||||
except Exception as e:
|
||||
await ctx.send_interactive(
|
||||
self.get_pages("{}: {!s}".format(type(e).__name__, e)), box_lang="py"
|
||||
)
|
||||
return
|
||||
|
||||
self._last_result = result
|
||||
result = self.sanitize_output(ctx, str(result))
|
||||
|
||||
await ctx.tick()
|
||||
await ctx.send_interactive(self.get_pages(result), box_lang="py")
|
||||
output = await DevOutput.from_debug(ctx, source=source, env=env)
|
||||
self._last_result = output.result
|
||||
await output.send()
|
||||
|
||||
@commands.command(name="eval")
|
||||
@commands.is_owner()
|
||||
async def _eval(self, ctx, *, body: str):
|
||||
async def _eval(self, ctx: commands.Context, *, body: str) -> None:
|
||||
"""Execute asynchronous code.
|
||||
|
||||
This command wraps code into the body of an async function and then
|
||||
@ -198,40 +413,16 @@ class Dev(commands.Cog):
|
||||
`cf` - the redbot.core.utils.chat_formatting module
|
||||
"""
|
||||
env = self.get_environment(ctx)
|
||||
body = self.cleanup_code(body)
|
||||
stdout = io.StringIO()
|
||||
source = cleanup_code(body)
|
||||
|
||||
to_compile = "async def func():\n%s" % textwrap.indent(body, " ")
|
||||
|
||||
try:
|
||||
compiled = self.async_compile(to_compile, "<string>", "exec")
|
||||
exec(compiled, env)
|
||||
except SyntaxError as e:
|
||||
return await ctx.send_interactive(self.get_syntax_error(e), box_lang="py")
|
||||
|
||||
func = env["func"]
|
||||
result = None
|
||||
try:
|
||||
with redirect_stdout(stdout):
|
||||
result = await func()
|
||||
except Exception:
|
||||
printed = "{}{}".format(stdout.getvalue(), traceback.format_exc())
|
||||
else:
|
||||
printed = stdout.getvalue()
|
||||
await ctx.tick()
|
||||
|
||||
if result is not None:
|
||||
self._last_result = result
|
||||
msg = "{}{}".format(printed, result)
|
||||
else:
|
||||
msg = printed
|
||||
msg = self.sanitize_output(ctx, msg)
|
||||
|
||||
await ctx.send_interactive(self.get_pages(msg), box_lang="py")
|
||||
output = await DevOutput.from_eval(ctx, source=source, env=env)
|
||||
if output.result is not None:
|
||||
self._last_result = output.result
|
||||
await output.send()
|
||||
|
||||
@commands.group(invoke_without_command=True)
|
||||
@commands.is_owner()
|
||||
async def repl(self, ctx):
|
||||
async def repl(self, ctx: commands.Context) -> None:
|
||||
"""Open an interactive REPL.
|
||||
|
||||
The REPL will only recognise code as messages which start with a
|
||||
@ -280,71 +471,28 @@ class Dev(commands.Cog):
|
||||
|
||||
while True:
|
||||
response = await ctx.bot.wait_for("message", check=MessagePredicate.regex(r"^`", ctx))
|
||||
env["message"] = response
|
||||
|
||||
if not self.sessions[ctx.channel.id]:
|
||||
continue
|
||||
|
||||
cleaned = self.cleanup_code(response.content)
|
||||
source = cleanup_code(response.content)
|
||||
|
||||
if cleaned in ("quit", "exit", "exit()"):
|
||||
if source in ("quit", "exit", "exit()"):
|
||||
await ctx.send(_("Exiting."))
|
||||
del self.sessions[ctx.channel.id]
|
||||
return
|
||||
|
||||
executor = None
|
||||
if cleaned.count("\n") == 0:
|
||||
# single statement, potentially 'eval'
|
||||
output = await DevOutput.from_repl(ctx, source=source, env=env)
|
||||
try:
|
||||
code = self.async_compile(cleaned, "<repl session>", "eval")
|
||||
except SyntaxError:
|
||||
pass
|
||||
else:
|
||||
executor = eval
|
||||
|
||||
if executor is None:
|
||||
try:
|
||||
code = self.async_compile(cleaned, "<repl session>", "exec")
|
||||
except SyntaxError as e:
|
||||
await ctx.send_interactive(self.get_syntax_error(e), box_lang="py")
|
||||
continue
|
||||
|
||||
env["message"] = response
|
||||
stdout = io.StringIO()
|
||||
|
||||
msg = ""
|
||||
|
||||
try:
|
||||
with redirect_stdout(stdout):
|
||||
if executor is None:
|
||||
result = types.FunctionType(code, env)()
|
||||
else:
|
||||
result = executor(code, env)
|
||||
result = await self.maybe_await(result)
|
||||
except Exception:
|
||||
value = stdout.getvalue()
|
||||
msg = "{}{}".format(value, traceback.format_exc())
|
||||
else:
|
||||
value = stdout.getvalue()
|
||||
if result is not None:
|
||||
try:
|
||||
msg = "{}{}".format(value, result)
|
||||
except Exception:
|
||||
msg = "{}{}".format(value, traceback.format_exc())
|
||||
env["_"] = result
|
||||
elif value:
|
||||
msg = "{}".format(value)
|
||||
|
||||
msg = self.sanitize_output(ctx, msg)
|
||||
|
||||
try:
|
||||
await ctx.send_interactive(self.get_pages(msg), box_lang="py")
|
||||
await output.send(tick=False)
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
except discord.HTTPException as e:
|
||||
await ctx.send(_("Unexpected error: `{}`").format(e))
|
||||
except discord.HTTPException as exc:
|
||||
await ctx.send(_("Unexpected error: ") + str(exc))
|
||||
|
||||
@repl.command(aliases=["resume"])
|
||||
async def pause(self, ctx, toggle: Optional[bool] = None):
|
||||
async def pause(self, ctx: commands.Context, toggle: Optional[bool] = None) -> None:
|
||||
"""Pauses/resumes the REPL running in the current channel."""
|
||||
if ctx.channel.id not in self.sessions:
|
||||
await ctx.send(_("There is no currently running REPL session in this channel."))
|
||||
@ -362,7 +510,7 @@ class Dev(commands.Cog):
|
||||
@commands.guild_only()
|
||||
@commands.command()
|
||||
@commands.is_owner()
|
||||
async def mock(self, ctx, user: discord.Member, *, command):
|
||||
async def mock(self, ctx: commands.Context, user: discord.Member, *, command: str) -> None:
|
||||
"""Mock another user invoking a command.
|
||||
|
||||
The prefix must not be entered.
|
||||
@ -376,7 +524,9 @@ class Dev(commands.Cog):
|
||||
@commands.guild_only()
|
||||
@commands.command(name="mockmsg")
|
||||
@commands.is_owner()
|
||||
async def mock_msg(self, ctx, user: discord.Member, *, content: str = ""):
|
||||
async def mock_msg(
|
||||
self, ctx: commands.Context, user: discord.Member, *, content: str = ""
|
||||
) -> None:
|
||||
"""Dispatch a message event as if it were sent by a different user.
|
||||
|
||||
Current message is used as a base (including attachments, embeds, etc.),
|
||||
@ -397,7 +547,7 @@ class Dev(commands.Cog):
|
||||
|
||||
@commands.command()
|
||||
@commands.is_owner()
|
||||
async def bypasscooldowns(self, ctx, toggle: Optional[bool] = None):
|
||||
async def bypasscooldowns(self, ctx: commands.Context, toggle: Optional[bool] = None) -> None:
|
||||
"""Give bot owners the ability to bypass cooldowns.
|
||||
|
||||
Does not persist through restarts."""
|
||||
|
||||
391
tests/core/test_dev_commands.py
Normal file
391
tests/core/test_dev_commands.py
Normal file
@ -0,0 +1,391 @@
|
||||
import sys
|
||||
import textwrap
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from redbot.core import commands
|
||||
from redbot.core.dev_commands import DevOutput, cleanup_code
|
||||
|
||||
|
||||
# the examples are based on how the markdown ends up being rendered by Discord
|
||||
@pytest.mark.parametrize(
|
||||
"content,source",
|
||||
(
|
||||
# no markdown to strip
|
||||
(
|
||||
"x = 1",
|
||||
"x = 1",
|
||||
),
|
||||
# inline with single backticks
|
||||
(
|
||||
"`x = 1`",
|
||||
"x = 1",
|
||||
),
|
||||
# inline with double backticks
|
||||
(
|
||||
"``x = 1``",
|
||||
"x = 1",
|
||||
),
|
||||
# code block within a single line
|
||||
(
|
||||
"```x = 1```",
|
||||
"x = 1",
|
||||
),
|
||||
# code block with code in first line and closing backquotes in separate line
|
||||
(
|
||||
"""\
|
||||
```x = 1
|
||||
```""",
|
||||
"x = 1",
|
||||
),
|
||||
# code block with closing backquotes in same line
|
||||
(
|
||||
"""\
|
||||
```
|
||||
x = 1```""",
|
||||
"x = 1",
|
||||
),
|
||||
# code block with opening and closing backquotes in separate lines
|
||||
(
|
||||
"""\
|
||||
```
|
||||
x = 1
|
||||
```""",
|
||||
"x = 1",
|
||||
),
|
||||
# code block with language specified and closing backquotes in separate line
|
||||
(
|
||||
"""\
|
||||
```py
|
||||
x = 1
|
||||
```""",
|
||||
"x = 1",
|
||||
),
|
||||
(
|
||||
"""\
|
||||
```python
|
||||
x = 1
|
||||
```""",
|
||||
"x = 1",
|
||||
),
|
||||
# code block with language specified and closing backquotes in same line
|
||||
(
|
||||
"""\
|
||||
```py
|
||||
x = 1```""",
|
||||
"x = 1",
|
||||
),
|
||||
(
|
||||
"""\
|
||||
```python
|
||||
x = 1```""",
|
||||
"x = 1",
|
||||
),
|
||||
# code block with the only line of code being a potentially valid language name
|
||||
# ('pass' is just a combination of letters) and being right after opening backquotes.
|
||||
(
|
||||
"""\
|
||||
```pass
|
||||
```""",
|
||||
"pass",
|
||||
),
|
||||
# leading newline characters should get stripped, ending backquotes on separate line
|
||||
(
|
||||
"""\
|
||||
```
|
||||
|
||||
|
||||
x = 1
|
||||
```""",
|
||||
"x = 1",
|
||||
),
|
||||
(
|
||||
"""\
|
||||
```python
|
||||
|
||||
|
||||
x = 1
|
||||
```""",
|
||||
"x = 1",
|
||||
),
|
||||
# leading newline characters should get stripped, ending backquotes on same line
|
||||
(
|
||||
"""\
|
||||
```
|
||||
|
||||
|
||||
x = 1```""",
|
||||
"x = 1",
|
||||
),
|
||||
(
|
||||
"""\
|
||||
```python
|
||||
|
||||
|
||||
x = 1```""",
|
||||
"x = 1",
|
||||
),
|
||||
),
|
||||
)
|
||||
def test_cleanup_code(content: str, source: str) -> None:
|
||||
content = textwrap.dedent(content)
|
||||
source = textwrap.dedent(source)
|
||||
assert cleanup_code(content) == source
|
||||
|
||||
|
||||
def _get_dev_output(source: str) -> DevOutput:
|
||||
return DevOutput(
|
||||
MagicMock(spec=commands.Context),
|
||||
source=source,
|
||||
filename="<test run>",
|
||||
env={"__builtins__": __builtins__, "__name__": "__main__", "_": None},
|
||||
)
|
||||
|
||||
|
||||
async def _run_dev_output(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
source: str,
|
||||
result: str,
|
||||
*,
|
||||
debug: bool = False,
|
||||
eval: bool = False,
|
||||
repl: bool = False,
|
||||
) -> None:
|
||||
source = textwrap.dedent(source)
|
||||
result = textwrap.dedent(result)
|
||||
monkeypatch.setattr("redbot.core.dev_commands.sanitize_output", lambda ctx, s: s)
|
||||
|
||||
if debug:
|
||||
output = _get_dev_output(source)
|
||||
await output.run_debug()
|
||||
assert str(output) == result
|
||||
# ensure that our Context mock is never actually used by anything
|
||||
assert not output.ctx.mock_calls
|
||||
|
||||
if eval:
|
||||
output = _get_dev_output(source.replace("<module>", "func"))
|
||||
await output.run_eval()
|
||||
assert str(output) == result.replace("<module>", "func")
|
||||
# ensure that our Context mock is never actually used by anything
|
||||
assert not output.ctx.mock_calls
|
||||
|
||||
if repl:
|
||||
output = _get_dev_output(source)
|
||||
await output.run_repl()
|
||||
assert str(output) == result
|
||||
# ensure that our Context mock is never actually used by anything
|
||||
assert not output.ctx.mock_calls
|
||||
|
||||
|
||||
EXPRESSION_TESTS = {
|
||||
# invalid syntax
|
||||
"12x\n": (
|
||||
(
|
||||
lambda v: v < (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 1
|
||||
12x
|
||||
^
|
||||
SyntaxError: invalid syntax
|
||||
""",
|
||||
),
|
||||
(
|
||||
lambda v: v >= (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 1
|
||||
12x
|
||||
^
|
||||
SyntaxError: invalid decimal literal
|
||||
""",
|
||||
),
|
||||
),
|
||||
"foo(x, z for z in range(10), t, w)": (
|
||||
(
|
||||
lambda v: v < (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 1
|
||||
foo(x, z for z in range(10), t, w)
|
||||
^
|
||||
SyntaxError: Generator expression must be parenthesized
|
||||
""",
|
||||
),
|
||||
(
|
||||
lambda v: v >= (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 1
|
||||
foo(x, z for z in range(10), t, w)
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
SyntaxError: Generator expression must be parenthesized
|
||||
""",
|
||||
),
|
||||
),
|
||||
# exception raised
|
||||
"abs(1 / 0)": (
|
||||
(
|
||||
lambda v: v < (3, 11),
|
||||
"""\
|
||||
Traceback (most recent call last):
|
||||
File "<test run>", line 1, in <module>
|
||||
abs(1 / 0)
|
||||
ZeroDivisionError: division by zero
|
||||
""",
|
||||
),
|
||||
(
|
||||
lambda v: v >= (3, 11),
|
||||
"""\
|
||||
Traceback (most recent call last):
|
||||
File "<test run>", line 1, in <module>
|
||||
abs(1 / 0)
|
||||
~~^~~
|
||||
ZeroDivisionError: division by zero
|
||||
""",
|
||||
),
|
||||
),
|
||||
}
|
||||
STATEMENT_TESTS = {
|
||||
# invalid syntax
|
||||
"""\
|
||||
def x():
|
||||
12x
|
||||
""": (
|
||||
(
|
||||
lambda v: v < (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 2
|
||||
12x
|
||||
^
|
||||
SyntaxError: invalid syntax
|
||||
""",
|
||||
),
|
||||
(
|
||||
lambda v: v >= (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 2
|
||||
12x
|
||||
^
|
||||
SyntaxError: invalid decimal literal
|
||||
""",
|
||||
),
|
||||
),
|
||||
"""\
|
||||
def x():
|
||||
foo(x, z for z in range(10), t, w)
|
||||
""": (
|
||||
(
|
||||
lambda v: v < (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 2
|
||||
foo(x, z for z in range(10), t, w)
|
||||
^
|
||||
SyntaxError: Generator expression must be parenthesized
|
||||
""",
|
||||
),
|
||||
(
|
||||
lambda v: v >= (3, 10),
|
||||
"""\
|
||||
File "<test run>", line 2
|
||||
foo(x, z for z in range(10), t, w)
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
SyntaxError: Generator expression must be parenthesized
|
||||
""",
|
||||
),
|
||||
),
|
||||
# exception raised
|
||||
"""\
|
||||
print(123)
|
||||
try:
|
||||
abs(1 / 0)
|
||||
except ValueError:
|
||||
pass
|
||||
""": (
|
||||
(
|
||||
lambda v: v < (3, 11),
|
||||
"""\
|
||||
123
|
||||
Traceback (most recent call last):
|
||||
File "<test run>", line 3, in <module>
|
||||
abs(1 / 0)
|
||||
ZeroDivisionError: division by zero
|
||||
""",
|
||||
),
|
||||
(
|
||||
lambda v: v >= (3, 11),
|
||||
"""\
|
||||
123
|
||||
Traceback (most recent call last):
|
||||
File "<test run>", line 3, in <module>
|
||||
abs(1 / 0)
|
||||
~~^~~
|
||||
ZeroDivisionError: division by zero
|
||||
""",
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"source,result",
|
||||
[
|
||||
(source, result)
|
||||
for source, results in EXPRESSION_TESTS.items()
|
||||
for condition, result in results
|
||||
if condition(sys.version_info)
|
||||
],
|
||||
)
|
||||
async def test_format_exception_expressions(
|
||||
monkeypatch: pytest.MonkeyPatch, source: str, result: str
|
||||
) -> None:
|
||||
await _run_dev_output(monkeypatch, source, result, debug=True, repl=True)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"source,result",
|
||||
[
|
||||
(source, result)
|
||||
for source, results in STATEMENT_TESTS.items()
|
||||
for condition, result in results
|
||||
if condition(sys.version_info)
|
||||
],
|
||||
)
|
||||
async def test_format_exception_statements(
|
||||
monkeypatch: pytest.MonkeyPatch, source: str, result: str
|
||||
) -> None:
|
||||
await _run_dev_output(monkeypatch, source, result, eval=True, repl=True)
|
||||
|
||||
|
||||
async def test_successful_run_debug(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
source = "print('hello world'), 123"
|
||||
result = "(None, 123)"
|
||||
await _run_dev_output(monkeypatch, source, result, debug=True)
|
||||
|
||||
|
||||
async def test_successful_run_eval(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
source = """\
|
||||
print("hello world")
|
||||
return 123
|
||||
"""
|
||||
result = """\
|
||||
hello world
|
||||
123"""
|
||||
await _run_dev_output(monkeypatch, source, result, eval=True)
|
||||
|
||||
|
||||
async def test_successful_run_repl_eval(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
source = "print('hello world'), 123"
|
||||
result = """\
|
||||
hello world
|
||||
(None, 123)"""
|
||||
await _run_dev_output(monkeypatch, source, result, repl=True)
|
||||
|
||||
|
||||
async def test_successful_run_repl_exec(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
source = """\
|
||||
print("hello")
|
||||
print("world")
|
||||
"""
|
||||
result = """\
|
||||
hello
|
||||
world
|
||||
"""
|
||||
await _run_dev_output(monkeypatch, source, result, repl=True)
|
||||
Loading…
x
Reference in New Issue
Block a user