From 864b6d313edac40afc4001cf6aeb0dc923ce53c1 Mon Sep 17 00:00:00 2001 From: Will Date: Fri, 1 Jun 2018 20:49:59 -0400 Subject: [PATCH] [V3 Core Commands] Refactor some commands for testing/RPC (#1691) * Extract load/unload/reload * Add a few more commands * Refactor load/unload signature * Add invite URL and version info * Black fixes * Split the incoming cog names in reload correctly * Reformat * Remove meta.bot --- redbot/__main__.py | 1 + redbot/core/core_commands.py | 358 +++++++++++++++++++++-------------- redbot/meta.py | 5 + 3 files changed, 225 insertions(+), 139 deletions(-) create mode 100644 redbot/meta.py diff --git a/redbot/__main__.py b/redbot/__main__.py index ad9248491..9e2083e88 100644 --- a/redbot/__main__.py +++ b/redbot/__main__.py @@ -14,6 +14,7 @@ from redbot.core.cli import interactive_config, confirm, parse_cli_flags, ask_se from redbot.core.core_commands import Core from redbot.core.dev_commands import Dev from redbot.core import rpc, __version__ +import redbot.meta import asyncio import logging.handlers import logging diff --git a/redbot/core/core_commands.py b/redbot/core/core_commands.py index 0ac261a07..05700b955 100644 --- a/redbot/core/core_commands.py +++ b/redbot/core/core_commands.py @@ -44,12 +44,194 @@ OWNER_DISCLAIMER = ( _ = i18n.Translator("Core", __file__) +class CoreLogic: + def __init__(self, bot: Red): + self.bot = bot + + async def _load(self, cog_names: list): + """ + Loads cogs by name. + Parameters + ---------- + cog_names : list of str + + Returns + ------- + tuple + 3 element tuple of loaded, failed, and not found cogs. + """ + failed_packages = [] + loaded_packages = [] + notfound_packages = [] + + bot = self.bot + + cogspecs = [] + + for name in cog_names: + try: + spec = await bot.cog_mgr.find_cog(name) + cogspecs.append((spec, name)) + except RuntimeError: + notfound_packages.append(name) + + for spec, name in cogspecs: + try: + self._cleanup_and_refresh_modules(spec.name) + await bot.load_extension(spec) + except Exception as e: + log.exception("Package loading failed", exc_info=e) + + exception_log = "Exception during loading of cog\n" + exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__)) + bot._last_exception = exception_log + failed_packages.append(name) + else: + await bot.add_loaded_package(name) + loaded_packages.append(name) + return loaded_packages, failed_packages, notfound_packages + + def _cleanup_and_refresh_modules(self, module_name: str): + """Interally reloads modules so that changes are detected""" + splitted = module_name.split(".") + + def maybe_reload(new_name): + try: + lib = sys.modules[new_name] + except KeyError: + pass + else: + importlib._bootstrap._exec(lib.__spec__, lib) + + modules = itertools.accumulate(splitted, "{}.{}".format) + for m in modules: + maybe_reload(m) + + children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)} + for child_name, lib in children.items(): + importlib._bootstrap._exec(lib.__spec__, lib) + + def _get_package_strings(self, packages: list, fmt: str, other: tuple = None): + """ + Gets the strings needed for the load, unload and reload commands + """ + packages = [inline(name) for name in packages] + + if other is None: + other = ("", "") + plural = "s" if len(packages) > 1 else "" + use_and, other = ("", other[0]) if len(packages) == 1 else (" and ", other[1]) + packages_string = ", ".join(packages[:-1]) + use_and + packages[-1] + + form = {"plural": plural, "packs": packages_string, "other": other} + final_string = fmt.format(**form) + return final_string + + async def _unload(self, cog_names: list): + """ + Unloads cogs with the given names. + + Parameters + ---------- + cog_names : list of str + + Returns + ------- + tuple + 2 element tuple of successful unloads and failed unloads. + """ + failed_packages = [] + unloaded_packages = [] + + bot = self.bot + + for name in cog_names: + if name in bot.extensions: + bot.unload_extension(name) + await bot.remove_loaded_package(name) + unloaded_packages.append(name) + else: + failed_packages.append(name) + + return unloaded_packages, failed_packages + + async def _reload(self, cog_names): + await self._unload(cog_names) + + loaded, load_failed, not_found = await self._load(cog_names) + + return loaded, load_failed, not_found + + async def _name(self, name: str = None): + """ + Gets or sets the bot's username. + + Parameters + ---------- + name : str + If passed, the bot will change it's username. + + Returns + ------- + str + The current (or new) username of the bot. + """ + if name is not None: + await self.bot.user.edit(username=name) + + return self.bot.user.name + + async def _prefixes(self, prefixes: list = None): + """ + Gets or sets the bot's global prefixes. + + Parameters + ---------- + prefixes : list of str + If passed, the bot will set it's global prefixes. + + Returns + ------- + list of str + The current (or new) list of prefixes. + """ + if prefixes: + prefixes = sorted(prefixes, reverse=True) + await self.bot.db.prefix.set(prefixes) + return self.bot.db.prefix() + + async def _version_info(self): + """ + Version information for Red and discord.py + + Returns + ------- + dict + `redbot` and `discordpy` keys containing version information for both. + """ + return {"redbot": __version__, "discordpy": discord.__version__} + + async def _invite_url(self): + """ + Generates the invite URL for the bot. + + Returns + ------- + str + Invite URL. + """ + if self.bot.user.bot: + app_info = await self.bot.application_info() + return discord.utils.oauth_url(app_info.id) + return "Not a bot account!" + + @i18n.cog_i18n(_) -class Core: +class Core(CoreLogic): """Commands related to core functions""" def __init__(self, bot): - self.bot = bot # type: Red + super().__init__(bot) @commands.command(hidden=True) async def ping(self, ctx): @@ -236,8 +418,7 @@ class Core: async def invite(self, ctx): """Show's Red's invite url""" if self.bot.user.bot: - app_info = await self.bot.application_info() - await ctx.author.send(discord.utils.oauth_url(app_info.id)) + await ctx.author.send(await self._invite_url()) else: await ctx.send("I'm not a bot account. I have no invite URL.") @@ -319,149 +500,69 @@ class Core: async def load(self, ctx, *, cog_name: str): """Loads packages""" - failed_packages = [] - loaded_packages = [] - notfound_packages = [] + cog_names = [c.strip() for c in cog_name.split(" ")] + loaded, failed, not_found = await self._load(cog_names) - cognames = [c.strip() for c in cog_name.split(" ")] - cogspecs = [] - - for c in cognames: - try: - spec = await ctx.bot.cog_mgr.find_cog(c) - cogspecs.append((spec, c)) - except RuntimeError: - notfound_packages.append(inline(c)) - # await ctx.send(_("No module named '{}' was found in any" - # " cog path.").format(c)) - - if len(cogspecs) > 0: - for spec, name in cogspecs: - try: - await ctx.bot.load_extension(spec) - except Exception as e: - log.exception("Package loading failed", exc_info=e) - - exception_log = "Exception in command '{}'\n" "".format( - ctx.command.qualified_name - ) - exception_log += "".join( - traceback.format_exception(type(e), e, e.__traceback__) - ) - self.bot._last_exception = exception_log - failed_packages.append(inline(name)) - else: - await ctx.bot.add_loaded_package(name) - loaded_packages.append(inline(name)) - - if loaded_packages: + if loaded: fmt = "Loaded {packs}" - formed = self.get_package_strings(loaded_packages, fmt) - await ctx.send(_(formed)) + formed = self._get_package_strings(loaded, fmt) + await ctx.send(formed) - if failed_packages: + if failed: fmt = ( "Failed to load package{plural} {packs}. Check your console or " "logs for details." ) - formed = self.get_package_strings(failed_packages, fmt) - await ctx.send(_(formed)) + formed = self.get_package_strings(failed, fmt) + await ctx.send(formed) - if notfound_packages: + if not_found: fmt = "The package{plural} {packs} {other} not found in any cog path." - formed = self.get_package_strings(notfound_packages, fmt, ("was", "were")) - await ctx.send(_(formed)) + formed = self._get_package_strings(not_found, fmt, ("was", "were")) + await ctx.send(formed) @commands.group() @checks.is_owner() async def unload(self, ctx, *, cog_name: str): """Unloads packages""" - cognames = [c.strip() for c in cog_name.split(" ")] - failed_packages = [] - unloaded_packages = [] - for c in cognames: - if c in ctx.bot.extensions: - ctx.bot.unload_extension(c) - await ctx.bot.remove_loaded_package(c) - unloaded_packages.append(inline(c)) - else: - failed_packages.append(inline(c)) + cog_names = [c.strip() for c in cog_name.split(" ")] - if unloaded_packages: + unloaded, failed = await self._unload(cog_names) + + if unloaded: fmt = "Package{plural} {packs} {other} unloaded." - formed = self.get_package_strings(unloaded_packages, fmt, ("was", "were")) + formed = self._get_package_strings(unloaded, fmt, ("was", "were")) await ctx.send(_(formed)) - if failed_packages: + if failed: fmt = "The package{plural} {packs} {other} not loaded." - formed = self.get_package_strings(failed_packages, fmt, ("is", "are")) - await ctx.send(_(formed)) + formed = self._get_package_strings(failed, fmt, ("is", "are")) + await ctx.send(formed) @commands.command(name="reload") @checks.is_owner() - async def _reload(self, ctx, *, cog_name: str): + async def reload_(self, ctx, *, cog_name: str): """Reloads packages""" - cognames = [c.strip() for c in cog_name.split(" ")] + cog_names = [c.strip() for c in cog_name.split(" ")] - for c in cognames: - ctx.bot.unload_extension(c) + loaded, failed, not_found = await self._reload(cog_names) - cogspecs = [] - failed_packages = [] - loaded_packages = [] - notfound_packages = [] - - for c in cognames: - try: - spec = await ctx.bot.cog_mgr.find_cog(c) - cogspecs.append((spec, c)) - except RuntimeError: - notfound_packages.append(inline(c)) - - for spec, name in cogspecs: - try: - self.cleanup_and_refresh_modules(spec.name) - await ctx.bot.load_extension(spec) - loaded_packages.append(inline(name)) - except Exception as e: - log.exception("Package reloading failed", exc_info=e) - - exception_log = "Exception in command '{}'\n" "".format(ctx.command.qualified_name) - exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__)) - self.bot._last_exception = exception_log - - failed_packages.append(inline(name)) - - if loaded_packages: + if loaded: fmt = "Package{plural} {packs} {other} reloaded." - formed = self.get_package_strings(loaded_packages, fmt, ("was", "were")) - await ctx.send(_(formed)) + formed = self._get_package_strings(loaded, fmt, ("was", "were")) + await ctx.send(formed) - if failed_packages: + if failed: fmt = "Failed to reload package{plural} {packs}. Check your " "logs for details" - formed = self.get_package_strings(failed_packages, fmt) - await ctx.send(_(formed)) + formed = self._get_package_strings(failed, fmt) + await ctx.send(formed) - if notfound_packages: + if not_found: fmt = "The package{plural} {packs} {other} not found in any cog path." - formed = self.get_package_strings(notfound_packages, fmt, ("was", "were")) - await ctx.send(_(formed)) - - def get_package_strings(self, packages: list, fmt: str, other: tuple = None): - """ - Gets the strings needed for the load, unload and reload commands - """ - if other is None: - other = ("", "") - plural = "s" if len(packages) > 1 else "" - use_and, other = ("", other[0]) if len(packages) == 1 else (" and ", other[1]) - packages_string = ", ".join(packages[:-1]) + use_and + packages[-1] - - form = {"plural": plural, "packs": packages_string, "other": other} - final_string = fmt.format(**form) - return final_string + formed = self._get_package_strings(not_found, fmt, ("was", "were")) + await ctx.send(formed) @commands.command(name="shutdown") @checks.is_owner() @@ -491,26 +592,6 @@ class Core: pass await ctx.bot.shutdown(restart=True) - def cleanup_and_refresh_modules(self, module_name: str): - """Interally reloads modules so that changes are detected""" - splitted = module_name.split(".") - - def maybe_reload(new_name): - try: - lib = sys.modules[new_name] - except KeyError: - pass - else: - importlib._bootstrap._exec(lib.__spec__, lib) - - modules = itertools.accumulate(splitted, "{}.{}".format) - for m in modules: - maybe_reload(m) - - children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)} - for child_name, lib in children.items(): - importlib._bootstrap._exec(lib.__spec__, lib) - @commands.group(name="set") async def _set(self, ctx): """Changes Red's settings""" @@ -711,7 +792,7 @@ class Core: async def _username(self, ctx, *, username: str): """Sets Red's username""" try: - await ctx.bot.user.edit(username=username) + await self._name(name=username) except discord.HTTPException: await ctx.send( _( @@ -743,8 +824,7 @@ class Core: if not prefixes: await ctx.send_help() return - prefixes = sorted(prefixes, reverse=True) - await ctx.bot.db.prefix.set(prefixes) + await self._prefixes(prefixes) await ctx.send(_("Prefix set.")) @_set.command(aliases=["serverprefixes"]) @@ -1278,7 +1358,7 @@ class Core: if spec is None: raise LookupError("No such cog found.") - self.cleanup_and_refresh_modules(spec.name) + self._cleanup_and_refresh_modules(spec.name) self.bot.load_extension(spec) diff --git a/redbot/meta.py b/redbot/meta.py new file mode 100644 index 000000000..62961fbc6 --- /dev/null +++ b/redbot/meta.py @@ -0,0 +1,5 @@ +""" +This module will contain various attributes useful for testing and cog development. +""" + +testing = False