[V3 Core Commands] Refactor some commands for testing/RPC (#1691)

* Extract load/unload/reload

* Add a few more commands

* Refactor load/unload signature

* Add invite URL and version info

* Black fixes

* Split the incoming cog names in reload correctly

* Reformat

* Remove meta.bot
This commit is contained in:
Will 2018-06-01 20:49:59 -04:00 committed by Tobotimus
parent d47d12e961
commit 864b6d313e
3 changed files with 225 additions and 139 deletions

View File

@ -14,6 +14,7 @@ from redbot.core.cli import interactive_config, confirm, parse_cli_flags, ask_se
from redbot.core.core_commands import Core from redbot.core.core_commands import Core
from redbot.core.dev_commands import Dev from redbot.core.dev_commands import Dev
from redbot.core import rpc, __version__ from redbot.core import rpc, __version__
import redbot.meta
import asyncio import asyncio
import logging.handlers import logging.handlers
import logging import logging

View File

@ -44,12 +44,194 @@ OWNER_DISCLAIMER = (
_ = i18n.Translator("Core", __file__) _ = i18n.Translator("Core", __file__)
class CoreLogic:
def __init__(self, bot: Red):
self.bot = bot
async def _load(self, cog_names: list):
"""
Loads cogs by name.
Parameters
----------
cog_names : list of str
Returns
-------
tuple
3 element tuple of loaded, failed, and not found cogs.
"""
failed_packages = []
loaded_packages = []
notfound_packages = []
bot = self.bot
cogspecs = []
for name in cog_names:
try:
spec = await bot.cog_mgr.find_cog(name)
cogspecs.append((spec, name))
except RuntimeError:
notfound_packages.append(name)
for spec, name in cogspecs:
try:
self._cleanup_and_refresh_modules(spec.name)
await bot.load_extension(spec)
except Exception as e:
log.exception("Package loading failed", exc_info=e)
exception_log = "Exception during loading of cog\n"
exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__))
bot._last_exception = exception_log
failed_packages.append(name)
else:
await bot.add_loaded_package(name)
loaded_packages.append(name)
return loaded_packages, failed_packages, notfound_packages
def _cleanup_and_refresh_modules(self, module_name: str):
"""Interally reloads modules so that changes are detected"""
splitted = module_name.split(".")
def maybe_reload(new_name):
try:
lib = sys.modules[new_name]
except KeyError:
pass
else:
importlib._bootstrap._exec(lib.__spec__, lib)
modules = itertools.accumulate(splitted, "{}.{}".format)
for m in modules:
maybe_reload(m)
children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)}
for child_name, lib in children.items():
importlib._bootstrap._exec(lib.__spec__, lib)
def _get_package_strings(self, packages: list, fmt: str, other: tuple = None):
"""
Gets the strings needed for the load, unload and reload commands
"""
packages = [inline(name) for name in packages]
if other is None:
other = ("", "")
plural = "s" if len(packages) > 1 else ""
use_and, other = ("", other[0]) if len(packages) == 1 else (" and ", other[1])
packages_string = ", ".join(packages[:-1]) + use_and + packages[-1]
form = {"plural": plural, "packs": packages_string, "other": other}
final_string = fmt.format(**form)
return final_string
async def _unload(self, cog_names: list):
"""
Unloads cogs with the given names.
Parameters
----------
cog_names : list of str
Returns
-------
tuple
2 element tuple of successful unloads and failed unloads.
"""
failed_packages = []
unloaded_packages = []
bot = self.bot
for name in cog_names:
if name in bot.extensions:
bot.unload_extension(name)
await bot.remove_loaded_package(name)
unloaded_packages.append(name)
else:
failed_packages.append(name)
return unloaded_packages, failed_packages
async def _reload(self, cog_names):
await self._unload(cog_names)
loaded, load_failed, not_found = await self._load(cog_names)
return loaded, load_failed, not_found
async def _name(self, name: str = None):
"""
Gets or sets the bot's username.
Parameters
----------
name : str
If passed, the bot will change it's username.
Returns
-------
str
The current (or new) username of the bot.
"""
if name is not None:
await self.bot.user.edit(username=name)
return self.bot.user.name
async def _prefixes(self, prefixes: list = None):
"""
Gets or sets the bot's global prefixes.
Parameters
----------
prefixes : list of str
If passed, the bot will set it's global prefixes.
Returns
-------
list of str
The current (or new) list of prefixes.
"""
if prefixes:
prefixes = sorted(prefixes, reverse=True)
await self.bot.db.prefix.set(prefixes)
return self.bot.db.prefix()
async def _version_info(self):
"""
Version information for Red and discord.py
Returns
-------
dict
`redbot` and `discordpy` keys containing version information for both.
"""
return {"redbot": __version__, "discordpy": discord.__version__}
async def _invite_url(self):
"""
Generates the invite URL for the bot.
Returns
-------
str
Invite URL.
"""
if self.bot.user.bot:
app_info = await self.bot.application_info()
return discord.utils.oauth_url(app_info.id)
return "Not a bot account!"
@i18n.cog_i18n(_) @i18n.cog_i18n(_)
class Core: class Core(CoreLogic):
"""Commands related to core functions""" """Commands related to core functions"""
def __init__(self, bot): def __init__(self, bot):
self.bot = bot # type: Red super().__init__(bot)
@commands.command(hidden=True) @commands.command(hidden=True)
async def ping(self, ctx): async def ping(self, ctx):
@ -236,8 +418,7 @@ class Core:
async def invite(self, ctx): async def invite(self, ctx):
"""Show's Red's invite url""" """Show's Red's invite url"""
if self.bot.user.bot: if self.bot.user.bot:
app_info = await self.bot.application_info() await ctx.author.send(await self._invite_url())
await ctx.author.send(discord.utils.oauth_url(app_info.id))
else: else:
await ctx.send("I'm not a bot account. I have no invite URL.") await ctx.send("I'm not a bot account. I have no invite URL.")
@ -319,149 +500,69 @@ class Core:
async def load(self, ctx, *, cog_name: str): async def load(self, ctx, *, cog_name: str):
"""Loads packages""" """Loads packages"""
failed_packages = [] cog_names = [c.strip() for c in cog_name.split(" ")]
loaded_packages = [] loaded, failed, not_found = await self._load(cog_names)
notfound_packages = []
cognames = [c.strip() for c in cog_name.split(" ")] if loaded:
cogspecs = []
for c in cognames:
try:
spec = await ctx.bot.cog_mgr.find_cog(c)
cogspecs.append((spec, c))
except RuntimeError:
notfound_packages.append(inline(c))
# await ctx.send(_("No module named '{}' was found in any"
# " cog path.").format(c))
if len(cogspecs) > 0:
for spec, name in cogspecs:
try:
await ctx.bot.load_extension(spec)
except Exception as e:
log.exception("Package loading failed", exc_info=e)
exception_log = "Exception in command '{}'\n" "".format(
ctx.command.qualified_name
)
exception_log += "".join(
traceback.format_exception(type(e), e, e.__traceback__)
)
self.bot._last_exception = exception_log
failed_packages.append(inline(name))
else:
await ctx.bot.add_loaded_package(name)
loaded_packages.append(inline(name))
if loaded_packages:
fmt = "Loaded {packs}" fmt = "Loaded {packs}"
formed = self.get_package_strings(loaded_packages, fmt) formed = self._get_package_strings(loaded, fmt)
await ctx.send(_(formed)) await ctx.send(formed)
if failed_packages: if failed:
fmt = ( fmt = (
"Failed to load package{plural} {packs}. Check your console or " "Failed to load package{plural} {packs}. Check your console or "
"logs for details." "logs for details."
) )
formed = self.get_package_strings(failed_packages, fmt) formed = self.get_package_strings(failed, fmt)
await ctx.send(_(formed)) await ctx.send(formed)
if notfound_packages: if not_found:
fmt = "The package{plural} {packs} {other} not found in any cog path." fmt = "The package{plural} {packs} {other} not found in any cog path."
formed = self.get_package_strings(notfound_packages, fmt, ("was", "were")) formed = self._get_package_strings(not_found, fmt, ("was", "were"))
await ctx.send(_(formed)) await ctx.send(formed)
@commands.group() @commands.group()
@checks.is_owner() @checks.is_owner()
async def unload(self, ctx, *, cog_name: str): async def unload(self, ctx, *, cog_name: str):
"""Unloads packages""" """Unloads packages"""
cognames = [c.strip() for c in cog_name.split(" ")]
failed_packages = []
unloaded_packages = []
for c in cognames: cog_names = [c.strip() for c in cog_name.split(" ")]
if c in ctx.bot.extensions:
ctx.bot.unload_extension(c)
await ctx.bot.remove_loaded_package(c)
unloaded_packages.append(inline(c))
else:
failed_packages.append(inline(c))
if unloaded_packages: unloaded, failed = await self._unload(cog_names)
if unloaded:
fmt = "Package{plural} {packs} {other} unloaded." fmt = "Package{plural} {packs} {other} unloaded."
formed = self.get_package_strings(unloaded_packages, fmt, ("was", "were")) formed = self._get_package_strings(unloaded, fmt, ("was", "were"))
await ctx.send(_(formed)) await ctx.send(_(formed))
if failed_packages: if failed:
fmt = "The package{plural} {packs} {other} not loaded." fmt = "The package{plural} {packs} {other} not loaded."
formed = self.get_package_strings(failed_packages, fmt, ("is", "are")) formed = self._get_package_strings(failed, fmt, ("is", "are"))
await ctx.send(_(formed)) await ctx.send(formed)
@commands.command(name="reload") @commands.command(name="reload")
@checks.is_owner() @checks.is_owner()
async def _reload(self, ctx, *, cog_name: str): async def reload_(self, ctx, *, cog_name: str):
"""Reloads packages""" """Reloads packages"""
cognames = [c.strip() for c in cog_name.split(" ")] cog_names = [c.strip() for c in cog_name.split(" ")]
for c in cognames: loaded, failed, not_found = await self._reload(cog_names)
ctx.bot.unload_extension(c)
cogspecs = [] if loaded:
failed_packages = []
loaded_packages = []
notfound_packages = []
for c in cognames:
try:
spec = await ctx.bot.cog_mgr.find_cog(c)
cogspecs.append((spec, c))
except RuntimeError:
notfound_packages.append(inline(c))
for spec, name in cogspecs:
try:
self.cleanup_and_refresh_modules(spec.name)
await ctx.bot.load_extension(spec)
loaded_packages.append(inline(name))
except Exception as e:
log.exception("Package reloading failed", exc_info=e)
exception_log = "Exception in command '{}'\n" "".format(ctx.command.qualified_name)
exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__))
self.bot._last_exception = exception_log
failed_packages.append(inline(name))
if loaded_packages:
fmt = "Package{plural} {packs} {other} reloaded." fmt = "Package{plural} {packs} {other} reloaded."
formed = self.get_package_strings(loaded_packages, fmt, ("was", "were")) formed = self._get_package_strings(loaded, fmt, ("was", "were"))
await ctx.send(_(formed)) await ctx.send(formed)
if failed_packages: if failed:
fmt = "Failed to reload package{plural} {packs}. Check your " "logs for details" fmt = "Failed to reload package{plural} {packs}. Check your " "logs for details"
formed = self.get_package_strings(failed_packages, fmt) formed = self._get_package_strings(failed, fmt)
await ctx.send(_(formed)) await ctx.send(formed)
if notfound_packages: if not_found:
fmt = "The package{plural} {packs} {other} not found in any cog path." fmt = "The package{plural} {packs} {other} not found in any cog path."
formed = self.get_package_strings(notfound_packages, fmt, ("was", "were")) formed = self._get_package_strings(not_found, fmt, ("was", "were"))
await ctx.send(_(formed)) await ctx.send(formed)
def get_package_strings(self, packages: list, fmt: str, other: tuple = None):
"""
Gets the strings needed for the load, unload and reload commands
"""
if other is None:
other = ("", "")
plural = "s" if len(packages) > 1 else ""
use_and, other = ("", other[0]) if len(packages) == 1 else (" and ", other[1])
packages_string = ", ".join(packages[:-1]) + use_and + packages[-1]
form = {"plural": plural, "packs": packages_string, "other": other}
final_string = fmt.format(**form)
return final_string
@commands.command(name="shutdown") @commands.command(name="shutdown")
@checks.is_owner() @checks.is_owner()
@ -491,26 +592,6 @@ class Core:
pass pass
await ctx.bot.shutdown(restart=True) await ctx.bot.shutdown(restart=True)
def cleanup_and_refresh_modules(self, module_name: str):
"""Interally reloads modules so that changes are detected"""
splitted = module_name.split(".")
def maybe_reload(new_name):
try:
lib = sys.modules[new_name]
except KeyError:
pass
else:
importlib._bootstrap._exec(lib.__spec__, lib)
modules = itertools.accumulate(splitted, "{}.{}".format)
for m in modules:
maybe_reload(m)
children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)}
for child_name, lib in children.items():
importlib._bootstrap._exec(lib.__spec__, lib)
@commands.group(name="set") @commands.group(name="set")
async def _set(self, ctx): async def _set(self, ctx):
"""Changes Red's settings""" """Changes Red's settings"""
@ -711,7 +792,7 @@ class Core:
async def _username(self, ctx, *, username: str): async def _username(self, ctx, *, username: str):
"""Sets Red's username""" """Sets Red's username"""
try: try:
await ctx.bot.user.edit(username=username) await self._name(name=username)
except discord.HTTPException: except discord.HTTPException:
await ctx.send( await ctx.send(
_( _(
@ -743,8 +824,7 @@ class Core:
if not prefixes: if not prefixes:
await ctx.send_help() await ctx.send_help()
return return
prefixes = sorted(prefixes, reverse=True) await self._prefixes(prefixes)
await ctx.bot.db.prefix.set(prefixes)
await ctx.send(_("Prefix set.")) await ctx.send(_("Prefix set."))
@_set.command(aliases=["serverprefixes"]) @_set.command(aliases=["serverprefixes"])
@ -1278,7 +1358,7 @@ class Core:
if spec is None: if spec is None:
raise LookupError("No such cog found.") raise LookupError("No such cog found.")
self.cleanup_and_refresh_modules(spec.name) self._cleanup_and_refresh_modules(spec.name)
self.bot.load_extension(spec) self.bot.load_extension(spec)

5
redbot/meta.py Normal file
View File

@ -0,0 +1,5 @@
"""
This module will contain various attributes useful for testing and cog development.
"""
testing = False