Michael H 0e9086ca1f [Core] Fix error handling in loading extensions. (#2688)
* fixes 2687

* raise the right exception
2019-05-31 21:38:44 +02:00

547 lines
18 KiB
Python

import asyncio
import inspect
import os
import logging
from collections import Counter
from enum import Enum
from importlib.machinery import ModuleSpec
from pathlib import Path
from typing import Optional, Union, List
import discord
import sys
from discord.ext.commands import when_mentioned_or
from . import Config, i18n, commands, errors
from .cog_manager import CogManager
from .rpc import RPCMixin
from .utils import common_filters
CUSTOM_GROUPS = "CUSTOM_GROUPS"
log = logging.getLogger("redbot")
def _is_submodule(parent, child):
return parent == child or child.startswith(parent + ".")
class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
"""Mixin for the main bot class.
This exists because `Red` inherits from `discord.AutoShardedClient`, which
is something other bot classes may not want to have as a parent class.
"""
def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs):
self._shutdown_mode = ExitCodes.CRITICAL
self.db = Config.get_core_conf(force_registration=True)
self._co_owners = cli_flags.co_owner
self.rpc_enabled = cli_flags.rpc
self._last_exception = None
self.db.register_global(
token=None,
prefix=[],
packages=[],
owner=None,
whitelist=[],
blacklist=[],
locale="en-US",
embeds=True,
color=15158332,
fuzzy=False,
custom_info=None,
help__page_char_limit=1000,
help__max_pages_in_guild=2,
help__use_menus=False,
help__show_hidden=False,
help__verify_checks=True,
help__verify_exists=False,
help__tagline="",
disabled_commands=[],
disabled_command_msg="That command is disabled.",
api_tokens={},
extra_owner_destinations=[],
owner_opt_out_list=[],
)
self.db.register_guild(
prefix=[],
whitelist=[],
blacklist=[],
admin_role=None,
mod_role=None,
embeds=None,
use_bot_color=False,
fuzzy=False,
disabled_commands=[],
autoimmune_ids=[],
)
self.db.register_user(embeds=None)
self.db.init_custom(CUSTOM_GROUPS, 2)
self.db.register_custom(CUSTOM_GROUPS)
async def prefix_manager(bot, message):
if not cli_flags.prefix:
global_prefix = await bot.db.prefix()
else:
global_prefix = cli_flags.prefix
if message.guild is None:
return global_prefix
server_prefix = await bot.db.guild(message.guild).prefix()
if cli_flags.mentionable:
return (
when_mentioned_or(*server_prefix)(bot, message)
if server_prefix
else when_mentioned_or(*global_prefix)(bot, message)
)
else:
return server_prefix if server_prefix else global_prefix
if "command_prefix" not in kwargs:
kwargs["command_prefix"] = prefix_manager
if cli_flags.owner and "owner_id" not in kwargs:
kwargs["owner_id"] = cli_flags.owner
if "owner_id" not in kwargs:
loop = asyncio.get_event_loop()
loop.run_until_complete(self._dict_abuse(kwargs))
if "command_not_found" not in kwargs:
kwargs["command_not_found"] = "Command {} not found.\n{}"
self.counter = Counter()
self.uptime = None
self.checked_time_accuracy = None
self.color = discord.Embed.Empty # This is needed or color ends up 0x000000
self.main_dir = bot_dir
self.cog_mgr = CogManager()
super().__init__(*args, help_command=None, **kwargs)
# Do not manually use the help formatter attribute here, see `send_help_for`,
# for a documented API. The internals of this object are still subject to change.
self._help_formatter = commands.help.RedHelpFormatter()
self.add_command(commands.help.red_help)
self._permissions_hooks: List[commands.CheckPredicate] = []
async def send_help_for(
self, ctx: commands.Context, help_for: Union[commands.Command, commands.GroupMixin, str]
):
"""
Invokes Red's helpformatter for a given context and object.
"""
return await self._help_formatter.send_help(ctx, help_for)
async def _dict_abuse(self, indict):
"""
Please blame <@269933075037814786> for this.
:param indict:
:return:
"""
indict["owner_id"] = await self.db.owner()
i18n.set_locale(await self.db.locale())
async def embed_requested(self, channel, user, command=None) -> bool:
"""
Determine if an embed is requested for a response.
Parameters
----------
channel : `discord.abc.GuildChannel` or `discord.abc.PrivateChannel`
The channel to check embed settings for.
user : `discord.abc.User`
The user to check embed settings for.
command
(Optional) the command ran.
Returns
-------
bool
:code:`True` if an embed is requested
"""
if isinstance(channel, discord.abc.PrivateChannel) or (
command and command == self.get_command("help")
):
user_setting = await self.db.user(user).embeds()
if user_setting is not None:
return user_setting
else:
guild_setting = await self.db.guild(channel.guild).embeds()
if guild_setting is not None:
return guild_setting
global_setting = await self.db.embeds()
return global_setting
async def is_owner(self, user):
if user.id in self._co_owners:
return True
return await super().is_owner(user)
async def is_admin(self, member: discord.Member):
"""Checks if a member is an admin of their guild."""
admin_role = await self.db.guild(member.guild).admin_role()
try:
if any(role.id == admin_role for role in member.roles):
return True
except AttributeError: # someone passed a webhook to this
pass
return False
async def is_mod(self, member: discord.Member):
"""Checks if a member is a mod or admin of their guild."""
mod_role = await self.db.guild(member.guild).mod_role()
admin_role = await self.db.guild(member.guild).admin_role()
try:
if any(role.id in (mod_role, admin_role) for role in member.roles):
return True
except AttributeError: # someone passed a webhook to this
pass
return False
async def get_context(self, message, *, cls=commands.Context):
return await super().get_context(message, cls=cls)
async def process_commands(self, message: discord.Message):
"""
modification from the base to do the same thing in the command case
but dispatch an additional event for cogs which want to handle normal messages
differently to command messages,
without the overhead of additional get_context calls per cog
"""
if not message.author.bot:
ctx = await self.get_context(message)
if ctx.valid:
return await self.invoke(ctx)
self.dispatch("message_without_command", message)
@staticmethod
def list_packages():
"""Lists packages present in the cogs the folder"""
return os.listdir("cogs")
async def save_packages_status(self, packages):
await self.db.packages.set(packages)
async def add_loaded_package(self, pkg_name: str):
async with self.db.packages() as curr_pkgs:
if pkg_name not in curr_pkgs:
curr_pkgs.append(pkg_name)
async def remove_loaded_package(self, pkg_name: str):
async with self.db.packages() as curr_pkgs:
while pkg_name in curr_pkgs:
curr_pkgs.remove(pkg_name)
async def load_extension(self, spec: ModuleSpec):
# NB: this completely bypasses `discord.ext.commands.Bot._load_from_module_spec`
name = spec.name.split(".")[-1]
if name in self.extensions:
raise errors.PackageAlreadyLoaded(spec)
lib = spec.loader.load_module()
if not hasattr(lib, "setup"):
del lib
raise discord.ClientException(f"extension {name} does not have a setup function")
try:
if asyncio.iscoroutinefunction(lib.setup):
await lib.setup(self)
else:
lib.setup(self)
except Exception as e:
self._remove_module_references(lib.__name__)
self._call_module_finalizers(lib, name)
raise errors.CogLoadError() from e
else:
self._BotBase__extensions[name] = lib
def remove_cog(self, cogname: str):
cog = self.get_cog(cogname)
if cog is None:
return
for cls in inspect.getmro(cog.__class__):
try:
hook = getattr(cog, f"_{cls.__name__}__permissions_hook")
except AttributeError:
pass
else:
self.remove_permissions_hook(hook)
super().remove_cog(cogname)
for meth in self.rpc_handlers.pop(cogname.upper(), ()):
self.unregister_rpc_handler(meth)
async def is_automod_immune(
self, to_check: Union[discord.Message, commands.Context, discord.abc.User, discord.Role]
) -> bool:
"""
Checks if the user, message, context, or role should be considered immune from automated
moderation actions.
This will return ``False`` in direct messages.
Parameters
----------
to_check : `discord.Message` or `commands.Context` or `discord.abc.User` or `discord.Role`
Something to check if it would be immune
Returns
-------
bool
``True`` if immune
"""
guild = to_check.guild
if not guild:
return False
if isinstance(to_check, discord.Role):
ids_to_check = [to_check.id]
else:
author = getattr(to_check, "author", to_check)
try:
ids_to_check = [r.id for r in author.roles]
except AttributeError:
# webhook messages are a user not member,
# cheaper than isinstance
return True # webhooks require significant permissions to enable.
else:
ids_to_check.append(author.id)
immune_ids = await self.db.guild(guild).autoimmune_ids()
return any(i in immune_ids for i in ids_to_check)
@staticmethod
async def send_filtered(
destination: discord.abc.Messageable,
filter_mass_mentions=True,
filter_invite_links=True,
filter_all_links=False,
**kwargs,
):
"""
This is a convienience wrapper around
discord.abc.Messageable.send
It takes the destination you'd like to send to, which filters to apply
(defaults on mass mentions, and invite links) and any other parameters
normally accepted by destination.send
This should realistically only be used for responding using user provided
input. (unfortunately, including usernames)
Manually crafted messages which dont take any user input have no need of this
"""
content = kwargs.pop("content", None)
if content:
if filter_mass_mentions:
content = common_filters.filter_mass_mentions(content)
if filter_invite_links:
content = common_filters.filter_invites(content)
if filter_all_links:
content = common_filters.filter_urls(content)
await destination.send(content=content, **kwargs)
def add_cog(self, cog: commands.Cog):
if not isinstance(cog, commands.Cog):
raise RuntimeError(
f"The {cog.__class__.__name__} cog in the {cog.__module__} package does "
f"not inherit from the commands.Cog base class. The cog author must update "
f"the cog to adhere to this requirement."
)
if not hasattr(cog, "requires"):
commands.Cog.__init__(cog)
for cls in inspect.getmro(cog.__class__):
try:
hook = getattr(cog, f"_{cls.__name__}__permissions_hook")
except AttributeError:
pass
else:
self.add_permissions_hook(hook)
for command in cog.__cog_commands__:
if not isinstance(command, commands.Command):
raise RuntimeError(
f"The {cog.__class__.__name__} cog in the {cog.__module__} package,"
" is not using Red's command module, and cannot be added. "
"If this is your cog, please use `from redbot.core import commands`"
"in place of `from discord.ext import commands`. For more details on "
"this requirement, see this page: "
"http://red-discordbot.readthedocs.io/en/v3-develop/framework_commands.html"
)
super().add_cog(cog)
self.dispatch("cog_add", cog)
for command in cog.__cog_commands__:
self.dispatch("command_add", command)
def clear_permission_rules(self, guild_id: Optional[int]) -> None:
"""Clear all permission overrides in a scope.
Parameters
----------
guild_id : Optional[int]
The guild ID to wipe permission overrides for. If
``None``, this will clear all global rules and leave all
guild rules untouched.
"""
for cog in self.cogs.values():
cog.requires.clear_all_rules(guild_id)
for command in self.walk_commands():
command.requires.clear_all_rules(guild_id)
def add_permissions_hook(self, hook: commands.CheckPredicate) -> None:
"""Add a permissions hook.
Permissions hooks are check predicates which are called before
calling `Requires.verify`, and they can optionally return an
override: ``True`` to allow, ``False`` to deny, and ``None`` to
default to normal behaviour.
Parameters
----------
hook
A command check predicate which returns ``True``, ``False``
or ``None``.
"""
self._permissions_hooks.append(hook)
def remove_permissions_hook(self, hook: commands.CheckPredicate) -> None:
"""Remove a permissions hook.
Parameters are the same as those in `add_permissions_hook`.
Raises
------
ValueError
If the permissions hook has not been added.
"""
self._permissions_hooks.remove(hook)
async def verify_permissions_hooks(self, ctx: commands.Context) -> Optional[bool]:
"""Run permissions hooks.
Parameters
----------
ctx : commands.Context
The context for the command being invoked.
Returns
-------
Optional[bool]
``False`` if any hooks returned ``False``, ``True`` if any
hooks return ``True`` and none returned ``False``, ``None``
otherwise.
"""
hook_results = []
for hook in self._permissions_hooks:
result = await discord.utils.maybe_coroutine(hook, ctx)
if result is not None:
hook_results.append(result)
if hook_results:
if all(hook_results):
ctx.permission_state = commands.PermState.ALLOWED_BY_HOOK
return True
else:
ctx.permission_state = commands.PermState.DENIED_BY_HOOK
return False
async def get_owner_notification_destinations(self) -> List[discord.abc.Messageable]:
"""
Gets the users and channels to send to
"""
destinations = []
opt_outs = await self.db.owner_opt_out_list()
for user_id in (self.owner_id, *self._co_owners):
if user_id not in opt_outs:
user = self.get_user(user_id)
if user:
destinations.append(user)
channel_ids = await self.db.extra_owner_destinations()
for channel_id in channel_ids:
channel = self.get_channel(channel_id)
if channel:
destinations.append(channel)
return destinations
async def send_to_owners(self, content=None, **kwargs):
"""
This sends something to all owners and their configured extra destinations.
This takes the same arguments as discord.abc.Messageable.send
This logs failing sends
"""
destinations = await self.get_owner_notification_destinations()
async def wrapped_send(location, content=None, **kwargs):
try:
await location.send(content, **kwargs)
except Exception as _exc:
log.exception(
f"I could not send an owner notification to ({location.id}){location}"
)
sends = [wrapped_send(d, content, **kwargs) for d in destinations]
await asyncio.gather(*sends)
class Red(RedBase, discord.AutoShardedClient):
"""
You're welcome Caleb.
"""
async def logout(self):
"""Logs out of Discord and closes all connections."""
await super().logout()
async def shutdown(self, *, restart: bool = False):
"""Gracefully quit Red.
The program will exit with code :code:`0` by default.
Parameters
----------
restart : bool
If :code:`True`, the program will exit with code :code:`26`. If the
launcher sees this, it will attempt to restart the bot.
"""
if not restart:
self._shutdown_mode = ExitCodes.SHUTDOWN
else:
self._shutdown_mode = ExitCodes.RESTART
await self.logout()
class ExitCodes(Enum):
CRITICAL = 1
SHUTDOWN = 0
RESTART = 26