mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-21 18:27:59 -05:00
Permissions redesign (#2149)
API changes: - Cogs must now inherit from `commands.Cog` (see #2151 for discussion and more details) - All functions which are not decorators in the `redbot.core.checks` module are now deprecated in favour of their counterparts in `redbot.core.utils.mod`. This is to make this module more consistent and end the confusing naming convention. - `redbot.core.checks.check_overrides` function is now gone, overrideable checks can now be created with the `@commands.permissions_check` decorator - Command, Group, Cog and Context have some new attributes and methods, but they are for internal use so shouldn't concern cog creators (unless they're making a permissions cog!). - `__permissions_check_before` and `__permissions_check_after` have been replaced: A cog method named `__permissions_hook` will be evaluated as permissions hooks in the same way `__permissions_check_before` previously was. Permissions hooks can also be added/removed/verified through the new `*_permissions_hook()` methods on the bot object, and they will be verified even when permissions is unloaded. - New utility method `redbot.core.utils.chat_formatting.humanize_list` - New dependency [`schema`](https://github.com/keleshev/schema) User-facing changes: - When a `@bot_has_permissions` check fails, the bot will respond saying what permissions were actually missing. - All YAML-related `[p]permissions` subcommands now reside under the `[p]permissions acl` sub-group (tbh I still think the whole cog has too many top-level commands) - The YAML schema for these commands has been changed - A rule cannot be set as allow and deny at the same time (previously this would just default to allow) Documentation: - New documentation for `redbot.core.commands.requires` and `redbot.core.checks` modules - Renewed documentation for the permissions cog - `sphinx.ext.doctest` is now enabled Note: standard discord.py checks will still behave exactly the same way, in fact they are checked before `Requires` is looked at, so they are not overrideable. Signed-off-by: Toby Harradine <tobyharradine@gmail.com>
This commit is contained in:
@@ -5,17 +5,12 @@ from collections import Counter
|
||||
from enum import Enum
|
||||
from importlib.machinery import ModuleSpec
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
from typing import Optional, Union, List
|
||||
|
||||
import discord
|
||||
import sys
|
||||
from discord.ext.commands import when_mentioned_or
|
||||
|
||||
# This supresses the PyNaCl warning that isn't relevant here
|
||||
from discord.voice_client import VoiceClient
|
||||
|
||||
VoiceClient.warn_nacl = False
|
||||
|
||||
from .cog_manager import CogManager
|
||||
from . import Config, i18n, commands
|
||||
from .rpc import RPCMixin
|
||||
@@ -124,6 +119,7 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
self.add_command(help_)
|
||||
|
||||
self._sentry_mgr = None
|
||||
self._permissions_hooks: List[commands.CheckPredicate] = []
|
||||
|
||||
def enable_sentry(self):
|
||||
"""Enable Sentry logging for Red."""
|
||||
@@ -200,7 +196,8 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
async def get_context(self, message, *, cls=commands.Context):
|
||||
return await super().get_context(message, cls=cls)
|
||||
|
||||
def list_packages(self):
|
||||
@staticmethod
|
||||
def list_packages():
|
||||
"""Lists packages present in the cogs the folder"""
|
||||
return os.listdir("cogs")
|
||||
|
||||
@@ -234,7 +231,26 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
|
||||
self.extensions[name] = lib
|
||||
|
||||
def remove_cog(self, cogname):
|
||||
def remove_cog(self, cogname: str):
|
||||
cog = self.get_cog(cogname)
|
||||
if cog is None:
|
||||
return
|
||||
|
||||
for when in ("before", "after"):
|
||||
try:
|
||||
hook = getattr(cog, f"_{cog.__class__.__name__}__red_permissions_{when}")
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
self.remove_permissions_hook(hook, when)
|
||||
|
||||
try:
|
||||
hook = getattr(cog, f"_{cog.__class__.__name__}__red_permissions_before")
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
self.remove_permissions_hook(hook)
|
||||
|
||||
super().remove_cog(cogname)
|
||||
|
||||
for meth in self.rpc_handlers.pop(cogname.upper(), ()):
|
||||
@@ -365,9 +381,19 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
|
||||
await destination.send(content=content, **kwargs)
|
||||
|
||||
def add_cog(self, cog):
|
||||
def add_cog(self, cog: commands.Cog):
|
||||
if not isinstance(cog, commands.Cog):
|
||||
raise RuntimeError(
|
||||
f"The {cog.__class__.__name__} cog in the {cog.__module__} package does "
|
||||
f"not inherit from the commands.Cog base class. The cog author must update "
|
||||
f"the cog to adhere to this requirement."
|
||||
)
|
||||
if not hasattr(cog, "requires"):
|
||||
commands.Cog.__init__(cog)
|
||||
for attr in dir(cog):
|
||||
_attr = getattr(cog, attr)
|
||||
if attr == f"_{cog.__class__.__name__}__permissions_hook":
|
||||
self.add_permissions_hook(_attr)
|
||||
if isinstance(_attr, discord.ext.commands.Command) and not isinstance(
|
||||
_attr, commands.Command
|
||||
):
|
||||
@@ -380,6 +406,7 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
"http://red-discordbot.readthedocs.io/en/v3-develop/framework_commands.html"
|
||||
)
|
||||
super().add_cog(cog)
|
||||
self.dispatch("cog_add", cog)
|
||||
|
||||
def add_command(self, command: commands.Command):
|
||||
if not isinstance(command, commands.Command):
|
||||
@@ -388,6 +415,76 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
super().add_command(command)
|
||||
self.dispatch("command_add", command)
|
||||
|
||||
def clear_permission_rules(self, guild_id: Optional[int]) -> None:
|
||||
"""Clear all permission overrides in a scope.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
guild_id : Optional[int]
|
||||
The guild ID to wipe permission overrides for. If
|
||||
``None``, this will clear all global rules and leave all
|
||||
guild rules untouched.
|
||||
|
||||
"""
|
||||
for cog in self.cogs.values():
|
||||
cog.requires.clear_all_rules(guild_id)
|
||||
for command in self.walk_commands():
|
||||
command.requires.clear_all_rules(guild_id)
|
||||
|
||||
def add_permissions_hook(self, hook: commands.CheckPredicate) -> None:
|
||||
"""Add a permissions hook.
|
||||
|
||||
Permissions hooks are check predicates which are called before
|
||||
calling `Requires.verify`, and they can optionally return an
|
||||
override: ``True`` to allow, ``False`` to deny, and ``None`` to
|
||||
default to normal behaviour.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
hook
|
||||
A command check predicate which returns ``True``, ``False``
|
||||
or ``None``.
|
||||
|
||||
"""
|
||||
self._permissions_hooks.append(hook)
|
||||
|
||||
def remove_permissions_hook(self, hook: commands.CheckPredicate) -> None:
|
||||
"""Remove a permissions hook.
|
||||
|
||||
Parameters are the same as those in `add_permissions_hook`.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the permissions hook has not been added.
|
||||
|
||||
"""
|
||||
self._permissions_hooks.remove(hook)
|
||||
|
||||
async def verify_permissions_hooks(self, ctx: commands.Context) -> Optional[bool]:
|
||||
"""Run permissions hooks.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : commands.Context
|
||||
The context for the command being invoked.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Optional[bool]
|
||||
``False`` if any hooks returned ``False``, ``True`` if any
|
||||
hooks return ``True`` and none returned ``False``, ``None``
|
||||
otherwise.
|
||||
|
||||
"""
|
||||
hook_results = []
|
||||
for hook in self._permissions_hooks:
|
||||
result = await discord.utils.maybe_coroutine(hook, ctx)
|
||||
if result is not None:
|
||||
hook_results.append(result)
|
||||
if hook_results:
|
||||
return all(hook_results)
|
||||
|
||||
|
||||
class Red(RedBase, discord.AutoShardedClient):
|
||||
"""
|
||||
|
||||
@@ -1,126 +1,77 @@
|
||||
import warnings
|
||||
from typing import Awaitable, TYPE_CHECKING, Dict
|
||||
|
||||
import discord
|
||||
from redbot.core import commands
|
||||
|
||||
from .commands import (
|
||||
bot_has_permissions,
|
||||
has_permissions,
|
||||
is_owner,
|
||||
guildowner,
|
||||
guildowner_or_permissions,
|
||||
admin,
|
||||
admin_or_permissions,
|
||||
mod,
|
||||
mod_or_permissions,
|
||||
check as _check_decorator,
|
||||
)
|
||||
from .utils.mod import (
|
||||
is_mod_or_superior as _is_mod_or_superior,
|
||||
is_admin_or_superior as _is_admin_or_superior,
|
||||
check_permissions as _check_permissions,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .bot import Red
|
||||
from .commands import Context
|
||||
|
||||
__all__ = [
|
||||
"bot_has_permissions",
|
||||
"has_permissions",
|
||||
"is_owner",
|
||||
"guildowner",
|
||||
"guildowner_or_permissions",
|
||||
"admin",
|
||||
"admin_or_permissions",
|
||||
"mod",
|
||||
"mod_or_permissions",
|
||||
"is_mod_or_superior",
|
||||
"is_admin_or_superior",
|
||||
"bot_in_a_guild",
|
||||
"check_permissions",
|
||||
]
|
||||
|
||||
|
||||
async def check_overrides(ctx, *, level):
|
||||
if await ctx.bot.is_owner(ctx.author):
|
||||
return True
|
||||
perm_cog = ctx.bot.get_cog("Permissions")
|
||||
if not perm_cog or ctx.cog == perm_cog:
|
||||
return None
|
||||
# don't break if someone loaded a cog named
|
||||
# permissions that doesn't implement this
|
||||
func = getattr(perm_cog, "check_overrides", None)
|
||||
val = None if func is None else await func(ctx, level)
|
||||
return val
|
||||
def bot_in_a_guild():
|
||||
"""Deny the command if the bot is not in a guild."""
|
||||
|
||||
|
||||
def is_owner(**kwargs):
|
||||
async def check(ctx):
|
||||
return await ctx.bot.is_owner(ctx.author, **kwargs)
|
||||
|
||||
return commands.check(check)
|
||||
|
||||
|
||||
async def check_permissions(ctx, perms):
|
||||
if await ctx.bot.is_owner(ctx.author):
|
||||
return True
|
||||
elif not perms:
|
||||
return False
|
||||
resolved = ctx.channel.permissions_for(ctx.author)
|
||||
|
||||
return resolved.administrator or all(
|
||||
getattr(resolved, name, None) == value for name, value in perms.items()
|
||||
)
|
||||
|
||||
|
||||
async def is_mod_or_superior(ctx):
|
||||
if ctx.guild is None:
|
||||
return await ctx.bot.is_owner(ctx.author)
|
||||
else:
|
||||
author = ctx.author
|
||||
settings = ctx.bot.db.guild(ctx.guild)
|
||||
mod_role_id = await settings.mod_role()
|
||||
admin_role_id = await settings.admin_role()
|
||||
|
||||
mod_role = discord.utils.get(ctx.guild.roles, id=mod_role_id)
|
||||
admin_role = discord.utils.get(ctx.guild.roles, id=admin_role_id)
|
||||
|
||||
return (
|
||||
await ctx.bot.is_owner(ctx.author)
|
||||
or mod_role in author.roles
|
||||
or admin_role in author.roles
|
||||
or author == ctx.guild.owner
|
||||
)
|
||||
|
||||
|
||||
async def is_admin_or_superior(ctx):
|
||||
if ctx.guild is None:
|
||||
return await ctx.bot.is_owner(ctx.author)
|
||||
else:
|
||||
author = ctx.author
|
||||
settings = ctx.bot.db.guild(ctx.guild)
|
||||
admin_role_id = await settings.admin_role()
|
||||
admin_role = discord.utils.get(ctx.guild.roles, id=admin_role_id)
|
||||
|
||||
return (
|
||||
await ctx.bot.is_owner(ctx.author)
|
||||
or admin_role in author.roles
|
||||
or author == ctx.guild.owner
|
||||
)
|
||||
|
||||
|
||||
def mod_or_permissions(**perms):
|
||||
async def predicate(ctx):
|
||||
override = await check_overrides(ctx, level="mod")
|
||||
return (
|
||||
override
|
||||
if override is not None
|
||||
else await check_permissions(ctx, perms) or await is_mod_or_superior(ctx)
|
||||
)
|
||||
|
||||
return commands.check(predicate)
|
||||
|
||||
|
||||
def admin_or_permissions(**perms):
|
||||
async def predicate(ctx):
|
||||
override = await check_overrides(ctx, level="admin")
|
||||
return (
|
||||
override
|
||||
if override is not None
|
||||
else await check_permissions(ctx, perms) or await is_admin_or_superior(ctx)
|
||||
)
|
||||
|
||||
return commands.check(predicate)
|
||||
|
||||
|
||||
def bot_in_a_guild(**kwargs):
|
||||
async def predicate(ctx):
|
||||
return len(ctx.bot.guilds) > 0
|
||||
|
||||
return commands.check(predicate)
|
||||
return _check_decorator(predicate)
|
||||
|
||||
|
||||
def guildowner_or_permissions(**perms):
|
||||
async def predicate(ctx):
|
||||
has_perms_or_is_owner = await check_permissions(ctx, perms)
|
||||
if ctx.guild is None:
|
||||
return has_perms_or_is_owner
|
||||
is_guild_owner = ctx.author == ctx.guild.owner
|
||||
|
||||
override = await check_overrides(ctx, level="guildowner")
|
||||
return override if override is not None else is_guild_owner or has_perms_or_is_owner
|
||||
|
||||
return commands.check(predicate)
|
||||
def is_mod_or_superior(bot: "Red", member: discord.Member) -> Awaitable[bool]:
|
||||
warnings.warn(
|
||||
"`redbot.core.checks.is_mod_or_superior` is deprecated and will be removed in a future "
|
||||
"release, please use `redbot.core.utils.mod.is_mod_or_superior` instead.",
|
||||
category=DeprecationWarning,
|
||||
)
|
||||
return _is_mod_or_superior(bot, member)
|
||||
|
||||
|
||||
def guildowner():
|
||||
return guildowner_or_permissions()
|
||||
def is_admin_or_superior(bot: "Red", member: discord.Member) -> Awaitable[bool]:
|
||||
warnings.warn(
|
||||
"`redbot.core.checks.is_admin_or_superior` is deprecated and will be removed in a future "
|
||||
"release, please use `redbot.core.utils.mod.is_admin_or_superior` instead.",
|
||||
category=DeprecationWarning,
|
||||
)
|
||||
return _is_admin_or_superior(bot, member)
|
||||
|
||||
|
||||
def admin():
|
||||
return admin_or_permissions()
|
||||
|
||||
|
||||
def mod():
|
||||
return mod_or_permissions()
|
||||
def check_permissions(ctx: "Context", perms: Dict[str, bool]) -> Awaitable[bool]:
|
||||
warnings.warn(
|
||||
"`redbot.core.checks.check_permissions` is deprecated and will be removed in a future "
|
||||
"release, please use `redbot.core.utils.mod.check_permissions`."
|
||||
)
|
||||
return _check_permissions(ctx, perms)
|
||||
|
||||
@@ -311,7 +311,7 @@ _ = Translator("CogManagerUI", __file__)
|
||||
|
||||
|
||||
@cog_i18n(_)
|
||||
class CogManagerUI:
|
||||
class CogManagerUI(commands.Cog):
|
||||
"""Commands to interface with Red's cog manager."""
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -2,4 +2,6 @@
|
||||
from discord.ext.commands import *
|
||||
from .commands import *
|
||||
from .context import *
|
||||
from .converter import *
|
||||
from .errors import *
|
||||
from .requires import *
|
||||
|
||||
@@ -5,33 +5,118 @@ replace those from the `discord.ext.commands` module.
|
||||
"""
|
||||
import inspect
|
||||
import weakref
|
||||
from typing import Awaitable, Callable, TYPE_CHECKING
|
||||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
from . import converter as converters
|
||||
from .errors import ConversionFailure
|
||||
from .requires import PermState, PrivilegeLevel, Requires
|
||||
from ..i18n import Translator
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .context import Context
|
||||
|
||||
__all__ = ["Command", "GroupMixin", "Group", "command", "group"]
|
||||
__all__ = [
|
||||
"Cog",
|
||||
"CogCommandMixin",
|
||||
"CogGroupMixin",
|
||||
"Command",
|
||||
"Group",
|
||||
"GroupMixin",
|
||||
"command",
|
||||
"group",
|
||||
]
|
||||
|
||||
_ = Translator("commands.commands", __file__)
|
||||
|
||||
|
||||
class Command(commands.Command):
|
||||
class CogCommandMixin:
|
||||
"""A mixin for cogs and commands."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
if isinstance(self, Command):
|
||||
decorated = self.callback
|
||||
else:
|
||||
decorated = self
|
||||
self.requires: Requires = Requires(
|
||||
privilege_level=getattr(
|
||||
decorated, "__requires_privilege_level__", PrivilegeLevel.NONE
|
||||
),
|
||||
user_perms=getattr(decorated, "__requires_user_perms__", {}),
|
||||
bot_perms=getattr(decorated, "__requires_bot_perms__", {}),
|
||||
checks=getattr(decorated, "__requires_checks__", []),
|
||||
)
|
||||
|
||||
def allow_for(self, model_id: int, guild_id: int) -> None:
|
||||
"""Actively allow this command for the given model."""
|
||||
self.requires.set_rule(model_id, PermState.ACTIVE_ALLOW, guild_id=guild_id)
|
||||
|
||||
def deny_to(self, model_id: int, guild_id: int) -> None:
|
||||
"""Actively deny this command to the given model."""
|
||||
cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
|
||||
if cur_rule is PermState.PASSIVE_ALLOW:
|
||||
self.requires.set_rule(model_id, PermState.CAUTIOUS_ALLOW, guild_id=guild_id)
|
||||
else:
|
||||
self.requires.set_rule(model_id, PermState.ACTIVE_DENY, guild_id=guild_id)
|
||||
|
||||
def clear_rule_for(self, model_id: int, guild_id: int) -> Tuple[PermState, PermState]:
|
||||
"""Clear the rule which is currently set for this model."""
|
||||
cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
|
||||
if cur_rule is PermState.ACTIVE_ALLOW:
|
||||
new_rule = PermState.NORMAL
|
||||
elif cur_rule is PermState.ACTIVE_DENY:
|
||||
new_rule = PermState.NORMAL
|
||||
elif cur_rule is PermState.CAUTIOUS_ALLOW:
|
||||
new_rule = PermState.PASSIVE_ALLOW
|
||||
else:
|
||||
return cur_rule, cur_rule
|
||||
self.requires.set_rule(model_id, new_rule, guild_id=guild_id)
|
||||
return cur_rule, new_rule
|
||||
|
||||
def set_default_rule(self, rule: Optional[bool], guild_id: int) -> None:
|
||||
"""Set the default rule for this cog or command.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
rule : Optional[bool]
|
||||
The rule to set as default. If ``True`` for allow,
|
||||
``False`` for deny and ``None`` for normal.
|
||||
guild_id : Optional[int]
|
||||
Specify to set the default rule for a specific guild.
|
||||
When ``None``, this will set the global default rule.
|
||||
|
||||
"""
|
||||
if guild_id:
|
||||
self.requires.set_default_guild_rule(guild_id, PermState.from_bool(rule))
|
||||
else:
|
||||
self.requires.default_global_rule = PermState.from_bool(rule)
|
||||
|
||||
|
||||
class Command(CogCommandMixin, commands.Command):
|
||||
"""Command class for Red.
|
||||
|
||||
This should not be created directly, and instead via the decorator.
|
||||
|
||||
This class inherits from `discord.ext.commands.Command`.
|
||||
This class inherits from `discord.ext.commands.Command`. The
|
||||
attributes listed below are simply additions to the ones listed
|
||||
with that class.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
checks : List[`coroutine function`]
|
||||
A list of check predicates which cannot be overridden, unlike
|
||||
`Requires.checks`.
|
||||
translator : Translator
|
||||
A translator for this command's help docstring.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._help_override = kwargs.pop("help_override", None)
|
||||
super().__init__(*args, **kwargs)
|
||||
self._help_override = kwargs.pop("help_override", None)
|
||||
self.translator = kwargs.pop("i18n", None)
|
||||
|
||||
@property
|
||||
@@ -59,11 +144,10 @@ class Command(commands.Command):
|
||||
pass
|
||||
|
||||
@property
|
||||
def parents(self):
|
||||
"""
|
||||
Returns all parent commands of this command.
|
||||
def parents(self) -> List["Group"]:
|
||||
"""List[Group] : Returns all parent commands of this command.
|
||||
|
||||
This is a list, sorted by the length of :attr:`.qualified_name` from highest to lowest.
|
||||
This is sorted by the length of :attr:`.qualified_name` from highest to lowest.
|
||||
If the command has no parents, this will be an empty list.
|
||||
"""
|
||||
cmd = self.parent
|
||||
@@ -73,6 +157,33 @@ class Command(commands.Command):
|
||||
cmd = cmd.parent
|
||||
return sorted(entries, key=lambda x: len(x.qualified_name), reverse=True)
|
||||
|
||||
async def can_run(self, ctx: "Context") -> bool:
|
||||
"""Check if this command can be run in the given context.
|
||||
|
||||
This function first checks if the command can be run using
|
||||
discord.py's method `discord.ext.commands.Command.can_run`,
|
||||
then will return the result of `Requires.verify`.
|
||||
"""
|
||||
ret = await super().can_run(ctx)
|
||||
if ret is False:
|
||||
return False
|
||||
|
||||
# This is so contexts invoking other commands can be checked with
|
||||
# this command as well
|
||||
original_command = ctx.command
|
||||
ctx.command = self
|
||||
|
||||
if self.parent is None and self.instance is not None:
|
||||
# For top-level commands, we need to check the cog's requires too
|
||||
ret = await self.instance.requires.verify(ctx)
|
||||
if ret is False:
|
||||
return False
|
||||
|
||||
try:
|
||||
return await self.requires.verify(ctx)
|
||||
finally:
|
||||
ctx.command = original_command
|
||||
|
||||
async def do_conversion(
|
||||
self, ctx: "Context", converter, argument: str, param: inspect.Parameter
|
||||
):
|
||||
@@ -179,8 +290,32 @@ class Command(commands.Command):
|
||||
else:
|
||||
return True
|
||||
|
||||
def allow_for(self, model_id: int, guild_id: int) -> None:
|
||||
super().allow_for(model_id, guild_id=guild_id)
|
||||
parents = self.parents
|
||||
if self.instance is not None:
|
||||
parents.append(self.instance)
|
||||
for parent in parents:
|
||||
cur_rule = parent.requires.get_rule(model_id, guild_id=guild_id)
|
||||
if cur_rule is PermState.NORMAL:
|
||||
parent.requires.set_rule(model_id, PermState.PASSIVE_ALLOW, guild_id=guild_id)
|
||||
elif cur_rule is PermState.ACTIVE_DENY:
|
||||
parent.requires.set_rule(model_id, PermState.CAUTIOUS_ALLOW, guild_id=guild_id)
|
||||
|
||||
class GroupMixin(commands.GroupMixin):
|
||||
def clear_rule_for(self, model_id: int, guild_id: int) -> Tuple[PermState, PermState]:
|
||||
old_rule, new_rule = super().clear_rule_for(model_id, guild_id=guild_id)
|
||||
if old_rule is PermState.ACTIVE_ALLOW:
|
||||
parents = self.parents
|
||||
if self.instance is not None:
|
||||
parents.append(self.instance)
|
||||
for parent in parents:
|
||||
should_continue = parent.reevaluate_rules_for(model_id, guild_id=guild_id)[1]
|
||||
if not should_continue:
|
||||
break
|
||||
return old_rule, new_rule
|
||||
|
||||
|
||||
class GroupMixin(discord.ext.commands.GroupMixin):
|
||||
"""Mixin for `Group` and `Red` classes.
|
||||
|
||||
This class inherits from :class:`discord.ext.commands.GroupMixin`.
|
||||
@@ -211,7 +346,34 @@ class GroupMixin(commands.GroupMixin):
|
||||
return decorator
|
||||
|
||||
|
||||
class Group(GroupMixin, Command, commands.Group):
|
||||
class CogGroupMixin:
|
||||
requires: Requires
|
||||
all_commands: Dict[str, Command]
|
||||
|
||||
def reevaluate_rules_for(
|
||||
self, model_id: int, guild_id: Optional[int]
|
||||
) -> Tuple[PermState, bool]:
|
||||
cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
|
||||
if cur_rule in (PermState.NORMAL, PermState.ACTIVE_ALLOW, PermState.ACTIVE_DENY):
|
||||
# These three states are unaffected by subcommand rules
|
||||
return cur_rule, False
|
||||
else:
|
||||
# Remaining states can be changed if there exists no actively-allowed
|
||||
# subcommand (this includes subcommands multiple levels below)
|
||||
if any(
|
||||
cmd.requires.get_rule(model_id, guild_id=guild_id) in PermState.ALLOWED_STATES
|
||||
for cmd in self.all_commands.values()
|
||||
):
|
||||
return cur_rule, False
|
||||
elif cur_rule is PermState.PASSIVE_ALLOW:
|
||||
self.requires.set_rule(model_id, PermState.NORMAL, guild_id=guild_id)
|
||||
return PermState.NORMAL, True
|
||||
elif cur_rule is PermState.CAUTIOUS_ALLOW:
|
||||
self.requires.set_rule(model_id, PermState.ACTIVE_DENY, guild_id=guild_id)
|
||||
return PermState.ACTIVE_DENY, True
|
||||
|
||||
|
||||
class Group(GroupMixin, Command, CogGroupMixin, commands.Group):
|
||||
"""Group command class for Red.
|
||||
|
||||
This class inherits from `Command`, with :class:`GroupMixin` and
|
||||
@@ -222,7 +384,7 @@ class Group(GroupMixin, Command, commands.Group):
|
||||
self.autohelp = kwargs.pop("autohelp", True)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def invoke(self, ctx):
|
||||
async def invoke(self, ctx: "Context"):
|
||||
view = ctx.view
|
||||
previous = view.index
|
||||
view.skip_ws()
|
||||
@@ -247,7 +409,12 @@ class Group(GroupMixin, Command, commands.Group):
|
||||
await super().invoke(ctx)
|
||||
|
||||
|
||||
# decorators
|
||||
class Cog(CogCommandMixin, CogGroupMixin):
|
||||
"""Base class for a cog."""
|
||||
|
||||
@property
|
||||
def all_commands(self) -> Dict[str, Command]:
|
||||
return {cmd.name: cmd for cmd in self.__dict__.values() if isinstance(cmd, Command)}
|
||||
|
||||
|
||||
def command(name=None, cls=Command, **attrs):
|
||||
|
||||
@@ -4,8 +4,9 @@ from typing import Iterable, List
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
from redbot.core.utils.chat_formatting import box
|
||||
from redbot.core.utils import common_filters
|
||||
from .requires import PermState
|
||||
from ..utils.chat_formatting import box
|
||||
from ..utils import common_filters
|
||||
|
||||
TICK = "\N{WHITE HEAVY CHECK MARK}"
|
||||
|
||||
@@ -20,6 +21,10 @@ class Context(commands.Context):
|
||||
This class inherits from `discord.ext.commands.Context`.
|
||||
"""
|
||||
|
||||
def __init__(self, **attrs):
|
||||
super().__init__(**attrs)
|
||||
self.permission_state: PermState = PermState.NORMAL
|
||||
|
||||
async def send(self, content=None, **kwargs):
|
||||
"""Sends a message to the destination with the content given.
|
||||
|
||||
|
||||
41
redbot/core/commands/converter.py
Normal file
41
redbot/core/commands/converter.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import discord
|
||||
|
||||
from . import BadArgument
|
||||
from ..i18n import Translator
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .context import Context
|
||||
|
||||
__all__ = ["GuildConverter"]
|
||||
|
||||
_ = Translator("commands.converter", __file__)
|
||||
|
||||
ID_REGEX = re.compile(r"([0-9]{15,21})")
|
||||
|
||||
|
||||
class GuildConverter(discord.Guild):
|
||||
"""Converts to a `discord.Guild` object.
|
||||
|
||||
The lookup strategy is as follows (in order):
|
||||
|
||||
1. Lookup by ID.
|
||||
2. Lookup by name.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
async def convert(cls, ctx: "Context", argument: str) -> discord.Guild:
|
||||
match = ID_REGEX.fullmatch(argument)
|
||||
|
||||
if match is None:
|
||||
ret = discord.utils.get(ctx.bot.guilds, name=argument)
|
||||
else:
|
||||
guild_id = int(match.group(1))
|
||||
ret = ctx.bot.get_guild(guild_id)
|
||||
|
||||
if ret is None:
|
||||
raise BadArgument(_('Server "{name}" not found.').format(name=argument))
|
||||
|
||||
return ret
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Errors module for the commands package."""
|
||||
import inspect
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
__all__ = ["ConversionFailure"]
|
||||
__all__ = ["ConversionFailure", "BotMissingPermissions"]
|
||||
|
||||
|
||||
class ConversionFailure(commands.BadArgument):
|
||||
@@ -13,3 +14,11 @@ class ConversionFailure(commands.BadArgument):
|
||||
self.argument = argument
|
||||
self.param = param
|
||||
super().__init__(*args)
|
||||
|
||||
|
||||
class BotMissingPermissions(commands.CheckFailure):
|
||||
"""Raised if the bot is missing permissions required to run a command."""
|
||||
|
||||
def __init__(self, missing: discord.Permissions, *args):
|
||||
self.missing: discord.Permissions = missing
|
||||
super().__init__(*args)
|
||||
|
||||
668
redbot/core/commands/requires.py
Normal file
668
redbot/core/commands/requires.py
Normal file
@@ -0,0 +1,668 @@
|
||||
"""
|
||||
commands.requires
|
||||
=================
|
||||
This module manages the logic of resolving command permissions and
|
||||
requirements. This includes rules which override those requirements,
|
||||
as well as custom checks which can be overriden, and some special
|
||||
checks like bot permissions checks.
|
||||
"""
|
||||
import asyncio
|
||||
import enum
|
||||
from typing import (
|
||||
Union,
|
||||
Optional,
|
||||
List,
|
||||
Callable,
|
||||
Awaitable,
|
||||
Dict,
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
TypeVar,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
import discord
|
||||
|
||||
from .converter import GuildConverter
|
||||
from .errors import BotMissingPermissions
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .commands import Command
|
||||
from .context import Context
|
||||
|
||||
_CommandOrCoro = TypeVar("_CommandOrCoro", Callable[..., Awaitable[Any]], Command)
|
||||
|
||||
__all__ = [
|
||||
"CheckPredicate",
|
||||
"DM_PERMS",
|
||||
"GlobalPermissionModel",
|
||||
"GuildPermissionModel",
|
||||
"PermissionModel",
|
||||
"PrivilegeLevel",
|
||||
"PermState",
|
||||
"Requires",
|
||||
"permissions_check",
|
||||
"bot_has_permissions",
|
||||
"has_permissions",
|
||||
"is_owner",
|
||||
"guildowner",
|
||||
"guildowner_or_permissions",
|
||||
"admin",
|
||||
"admin_or_permissions",
|
||||
"mod",
|
||||
"mod_or_permissions",
|
||||
]
|
||||
|
||||
_T = TypeVar("_T")
|
||||
GlobalPermissionModel = Union[
|
||||
discord.User,
|
||||
discord.VoiceChannel,
|
||||
discord.TextChannel,
|
||||
discord.CategoryChannel,
|
||||
discord.Role,
|
||||
GuildConverter, # Unfortunately this will have to do for now
|
||||
]
|
||||
GuildPermissionModel = Union[
|
||||
discord.Member,
|
||||
discord.VoiceChannel,
|
||||
discord.TextChannel,
|
||||
discord.CategoryChannel,
|
||||
discord.Role,
|
||||
GuildConverter,
|
||||
]
|
||||
PermissionModel = Union[GlobalPermissionModel, GuildPermissionModel]
|
||||
CheckPredicate = Callable[["Context"], Union[Optional[bool], Awaitable[Optional[bool]]]]
|
||||
|
||||
# Here we are trying to model DM permissions as closely as possible. The only
|
||||
# discrepancy I've found is that users can pin messages, but they cannot delete them.
|
||||
# This means manage_messages is only half True, so it's left as False.
|
||||
# This is also the same as the permissions returned when `permissions_for` is used in DM.
|
||||
DM_PERMS = discord.Permissions.none()
|
||||
DM_PERMS.update(
|
||||
add_reactions=True,
|
||||
attach_files=True,
|
||||
embed_links=True,
|
||||
external_emojis=True,
|
||||
mention_everyone=True,
|
||||
read_message_history=True,
|
||||
read_messages=True,
|
||||
send_messages=True,
|
||||
)
|
||||
|
||||
|
||||
class PrivilegeLevel(enum.IntEnum):
|
||||
"""Enumeration for special privileges."""
|
||||
|
||||
NONE = enum.auto()
|
||||
"""No special privilege level."""
|
||||
|
||||
MOD = enum.auto()
|
||||
"""User has the mod role."""
|
||||
|
||||
ADMIN = enum.auto()
|
||||
"""User has the admin role."""
|
||||
|
||||
GUILD_OWNER = enum.auto()
|
||||
"""User is the guild level."""
|
||||
|
||||
BOT_OWNER = enum.auto()
|
||||
"""User is a bot owner."""
|
||||
|
||||
@classmethod
|
||||
async def from_ctx(cls, ctx: "Context") -> "PrivilegeLevel":
|
||||
"""Get a command author's PrivilegeLevel based on context."""
|
||||
if await ctx.bot.is_owner(ctx.author):
|
||||
return cls.BOT_OWNER
|
||||
elif ctx.guild is None:
|
||||
return cls.NONE
|
||||
elif ctx.author == ctx.guild.owner:
|
||||
return cls.GUILD_OWNER
|
||||
|
||||
# The following is simply an optimised way to check if the user has the
|
||||
# admin or mod role.
|
||||
guild_settings = ctx.bot.db.guild(ctx.guild)
|
||||
admin_role_id = await guild_settings.admin_role()
|
||||
mod_role_id = await guild_settings.mod_role()
|
||||
is_mod = False
|
||||
for role in ctx.author.roles:
|
||||
if role.id == admin_role_id:
|
||||
return cls.ADMIN
|
||||
elif role.id == mod_role_id:
|
||||
is_mod = True
|
||||
if is_mod:
|
||||
return cls.MOD
|
||||
|
||||
return cls.NONE
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{self.__class__.__name__}.{self.name}>"
|
||||
|
||||
|
||||
class PermState(enum.Enum):
|
||||
"""Enumeration for permission states used by rules."""
|
||||
|
||||
ACTIVE_ALLOW = enum.auto()
|
||||
"""This command has been actively allowed, default user checks
|
||||
should be ignored.
|
||||
"""
|
||||
|
||||
NORMAL = enum.auto()
|
||||
"""No overrides have been set for this command, make determination
|
||||
from default user checks.
|
||||
"""
|
||||
|
||||
PASSIVE_ALLOW = enum.auto()
|
||||
"""There exists a subcommand in the `ACTIVE_ALLOW` state, continue
|
||||
down the subcommand tree until we either find it or realise we're
|
||||
on the wrong branch.
|
||||
"""
|
||||
|
||||
CAUTIOUS_ALLOW = enum.auto()
|
||||
"""This command has been actively denied, but there exists a
|
||||
subcommand in the `ACTIVE_ALLOW` state. This occurs when
|
||||
`PASSIVE_ALLOW` and `ACTIVE_DENY` are combined.
|
||||
"""
|
||||
|
||||
ACTIVE_DENY = enum.auto()
|
||||
"""This command has been actively denied, terminate the command
|
||||
chain.
|
||||
"""
|
||||
|
||||
def transition_to(
|
||||
self, next_state: "PermState"
|
||||
) -> Tuple[Optional[bool], Union["PermState", Dict[bool, "PermState"]]]:
|
||||
return self.TRANSITIONS[self][next_state]
|
||||
|
||||
@classmethod
|
||||
def from_bool(cls, value: Optional[bool]) -> "PermState":
|
||||
"""Get a PermState from a bool or ``NoneType``."""
|
||||
if value is True:
|
||||
return cls.ACTIVE_ALLOW
|
||||
elif value is False:
|
||||
return cls.ACTIVE_DENY
|
||||
else:
|
||||
return cls.NORMAL
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{self.__class__.__name__}.{self.name}>"
|
||||
|
||||
|
||||
# Here we're defining how we transition between states.
|
||||
# The dict is in the form:
|
||||
# previous state -> this state -> Tuple[override, next state]
|
||||
# "override" is a bool describing whether or not the command should be
|
||||
# invoked. It can be None, in which case the default permission checks
|
||||
# will be used instead.
|
||||
# There is also one case where the "next state" is dependent on the
|
||||
# result of the default permission checks - the transition from NORMAL
|
||||
# to PASSIVE_ALLOW. In this case "next state" is a dict mapping the
|
||||
# permission check results to the actual next state.
|
||||
PermState.TRANSITIONS = {
|
||||
PermState.ACTIVE_ALLOW: {
|
||||
PermState.ACTIVE_ALLOW: (True, PermState.ACTIVE_ALLOW),
|
||||
PermState.NORMAL: (True, PermState.ACTIVE_ALLOW),
|
||||
PermState.PASSIVE_ALLOW: (True, PermState.ACTIVE_ALLOW),
|
||||
PermState.CAUTIOUS_ALLOW: (True, PermState.CAUTIOUS_ALLOW),
|
||||
PermState.ACTIVE_DENY: (False, PermState.ACTIVE_DENY),
|
||||
},
|
||||
PermState.NORMAL: {
|
||||
PermState.ACTIVE_ALLOW: (True, PermState.ACTIVE_ALLOW),
|
||||
PermState.NORMAL: (None, PermState.NORMAL),
|
||||
PermState.PASSIVE_ALLOW: (True, {True: PermState.NORMAL, False: PermState.PASSIVE_ALLOW}),
|
||||
PermState.CAUTIOUS_ALLOW: (True, PermState.CAUTIOUS_ALLOW),
|
||||
PermState.ACTIVE_DENY: (False, PermState.ACTIVE_DENY),
|
||||
},
|
||||
PermState.PASSIVE_ALLOW: {
|
||||
PermState.ACTIVE_ALLOW: (True, PermState.ACTIVE_ALLOW),
|
||||
PermState.NORMAL: (False, PermState.NORMAL),
|
||||
PermState.PASSIVE_ALLOW: (True, PermState.PASSIVE_ALLOW),
|
||||
PermState.CAUTIOUS_ALLOW: (True, PermState.CAUTIOUS_ALLOW),
|
||||
PermState.ACTIVE_DENY: (False, PermState.ACTIVE_DENY),
|
||||
},
|
||||
PermState.CAUTIOUS_ALLOW: {
|
||||
PermState.ACTIVE_ALLOW: (True, PermState.ACTIVE_ALLOW),
|
||||
PermState.NORMAL: (False, PermState.ACTIVE_DENY),
|
||||
PermState.PASSIVE_ALLOW: (True, PermState.CAUTIOUS_ALLOW),
|
||||
PermState.CAUTIOUS_ALLOW: (True, PermState.CAUTIOUS_ALLOW),
|
||||
PermState.ACTIVE_DENY: (False, PermState.ACTIVE_DENY),
|
||||
},
|
||||
PermState.ACTIVE_DENY: { # We can only start from ACTIVE_DENY if it is set on a cog.
|
||||
PermState.ACTIVE_ALLOW: (True, PermState.ACTIVE_ALLOW), # Should never happen
|
||||
PermState.NORMAL: (False, PermState.ACTIVE_DENY),
|
||||
PermState.PASSIVE_ALLOW: (False, PermState.ACTIVE_DENY), # Should never happen
|
||||
PermState.CAUTIOUS_ALLOW: (False, PermState.ACTIVE_DENY), # Should never happen
|
||||
PermState.ACTIVE_DENY: (False, PermState.ACTIVE_DENY),
|
||||
},
|
||||
}
|
||||
PermState.ALLOWED_STATES = (
|
||||
PermState.ACTIVE_ALLOW,
|
||||
PermState.PASSIVE_ALLOW,
|
||||
PermState.CAUTIOUS_ALLOW,
|
||||
)
|
||||
|
||||
|
||||
class Requires:
|
||||
"""This class describes the requirements for executing a specific command.
|
||||
|
||||
The permissions described include both bot permissions and user
|
||||
permissions.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
checks : List[Callable[[Context], Union[bool, Awaitable[bool]]]]
|
||||
A list of checks which can be overridden by rules. Use
|
||||
`Command.checks` if you would like them to never be overridden.
|
||||
privilege_level : PrivilegeLevel
|
||||
The required privilege level (bot owner, admin, etc.) for users
|
||||
to execute the command. Can be ``None``, in which case the
|
||||
`user_perms` will be used exclusively, otherwise, for levels
|
||||
other than bot owner, the user can still run the command if
|
||||
they have the required `user_perms`.
|
||||
user_perms : Optional[discord.Permissions]
|
||||
The required permissions for users to execute the command. Can
|
||||
be ``None``, in which case the `privilege_level` will be used
|
||||
exclusively, otherwise, it will pass whether the user has the
|
||||
required `privilege_level` _or_ `user_perms`.
|
||||
bot_perms : discord.Permissions
|
||||
The required bot permissions for a command to be executed. This
|
||||
is not overrideable by other conditions.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
privilege_level: Optional[PrivilegeLevel],
|
||||
user_perms: Union[Dict[str, bool], discord.Permissions, None],
|
||||
bot_perms: Union[Dict[str, bool], discord.Permissions],
|
||||
checks: List[CheckPredicate],
|
||||
):
|
||||
self.checks: List[CheckPredicate] = checks
|
||||
self.privilege_level: Optional[PrivilegeLevel] = privilege_level
|
||||
|
||||
if isinstance(user_perms, dict):
|
||||
self.user_perms: Optional[discord.Permissions] = discord.Permissions.none()
|
||||
self.user_perms.update(**user_perms)
|
||||
else:
|
||||
self.user_perms = user_perms
|
||||
|
||||
if isinstance(bot_perms, dict):
|
||||
self.bot_perms: discord.Permissions = discord.Permissions.none()
|
||||
self.bot_perms.update(**bot_perms)
|
||||
else:
|
||||
self.bot_perms = bot_perms
|
||||
self.default_global_rule: PermState = PermState.NORMAL
|
||||
self._global_rules: _IntKeyDict[PermState] = _IntKeyDict()
|
||||
self._default_guild_rules: _IntKeyDict[PermState] = _IntKeyDict()
|
||||
self._guild_rules: _IntKeyDict[_IntKeyDict[PermState]] = _IntKeyDict()
|
||||
|
||||
@staticmethod
|
||||
def get_decorator(
|
||||
privilege_level: Optional[PrivilegeLevel], user_perms: Dict[str, bool]
|
||||
) -> Callable[["_CommandOrCoro"], "_CommandOrCoro"]:
|
||||
if not user_perms:
|
||||
user_perms = None
|
||||
|
||||
def decorator(func: "_CommandOrCoro") -> "_CommandOrCoro":
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
func.__requires_privilege_level__ = privilege_level
|
||||
func.__requires_user_perms__ = user_perms
|
||||
else:
|
||||
func.requires.privilege_level = privilege_level
|
||||
if user_perms is None:
|
||||
func.requires.user_perms = None
|
||||
else:
|
||||
func.requires.user_perms.update(**user_perms)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
def get_rule(self, model: Union[int, PermissionModel], guild_id: int) -> PermState:
|
||||
"""Get the rule for a particular model.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
model : PermissionModel
|
||||
The model to get the rule for.
|
||||
guild_id : int
|
||||
The ID of the guild for the rule's scope. Set to ``0``
|
||||
for a global rule.
|
||||
|
||||
Returns
|
||||
-------
|
||||
PermState
|
||||
The state for this rule. See the `PermState` class
|
||||
for an explanation.
|
||||
|
||||
"""
|
||||
if not isinstance(model, int):
|
||||
model = model.id
|
||||
if guild_id:
|
||||
rules = self._guild_rules.get(guild_id, _IntKeyDict())
|
||||
else:
|
||||
rules = self._global_rules
|
||||
return rules.get(model, PermState.NORMAL)
|
||||
|
||||
def set_rule(self, model_id: int, rule: PermState, guild_id: int) -> None:
|
||||
"""Set the rule for a particular model.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
model_id : PermissionModel
|
||||
The model to add a rule for.
|
||||
rule : PermState
|
||||
Which state this rule should be set as. See the `PermState`
|
||||
class for an explanation.
|
||||
guild_id : int
|
||||
The ID of the guild for the rule's scope. Set to ``0``
|
||||
for a global rule.
|
||||
|
||||
"""
|
||||
if guild_id:
|
||||
rules = self._guild_rules.setdefault(guild_id, _IntKeyDict())
|
||||
else:
|
||||
rules = self._global_rules
|
||||
if rule is PermState.NORMAL:
|
||||
rules.pop(model_id, None)
|
||||
else:
|
||||
rules[model_id] = rule
|
||||
|
||||
def clear_all_rules(self, guild_id: int) -> None:
|
||||
"""Clear all rules of a particular scope.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
guild_id : int
|
||||
The guild ID to clear rules for. If ``0``, this will
|
||||
clear all global rules and leave all guild rules
|
||||
untouched.
|
||||
|
||||
"""
|
||||
if guild_id:
|
||||
rules = self._guild_rules.setdefault(guild_id, _IntKeyDict())
|
||||
else:
|
||||
rules = self._global_rules
|
||||
rules.clear()
|
||||
|
||||
def get_default_guild_rule(self, guild_id: int) -> PermState:
|
||||
"""Get the default rule for a guild."""
|
||||
return self._default_guild_rules.get(guild_id, PermState.NORMAL)
|
||||
|
||||
def set_default_guild_rule(self, guild_id: int, rule: PermState) -> None:
|
||||
"""Set the default rule for a guild."""
|
||||
self._default_guild_rules[guild_id] = rule
|
||||
|
||||
async def verify(self, ctx: "Context") -> bool:
|
||||
"""Check if the given context passes the requirements.
|
||||
|
||||
This will check the bot permissions, overrides, user permissions
|
||||
and privilege level.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : "Context"
|
||||
The invkokation context to check with.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
``True`` if the context passes the requirements.
|
||||
|
||||
Raises
|
||||
------
|
||||
BotMissingPermissions
|
||||
If the bot is missing required permissions to run the
|
||||
command.
|
||||
CommandError
|
||||
Propogated from any permissions checks.
|
||||
|
||||
"""
|
||||
await self._verify_bot(ctx)
|
||||
# Owner-only commands are non-overrideable
|
||||
if self.privilege_level is PrivilegeLevel.BOT_OWNER:
|
||||
return await ctx.bot.is_owner(ctx.author)
|
||||
|
||||
hook_result = await ctx.bot.verify_permissions_hooks(ctx)
|
||||
if hook_result is not None:
|
||||
return hook_result
|
||||
|
||||
return await self._transition_state(ctx)
|
||||
|
||||
async def _verify_bot(self, ctx: "Context") -> None:
|
||||
if ctx.guild is None:
|
||||
bot_user = ctx.bot.user
|
||||
else:
|
||||
bot_user = ctx.guild.me
|
||||
bot_perms = ctx.channel.permissions_for(bot_user)
|
||||
if not (bot_perms.administrator or bot_perms >= self.bot_perms):
|
||||
raise BotMissingPermissions(missing=self._missing_perms(self.bot_perms, bot_perms))
|
||||
|
||||
async def _transition_state(self, ctx: "Context") -> bool:
|
||||
prev_state = ctx.permission_state
|
||||
cur_state = self._get_rule_from_ctx(ctx)
|
||||
should_invoke, next_state = prev_state.transition_to(cur_state)
|
||||
if should_invoke is None:
|
||||
# NORMAL invokation, we simply follow standard procedure
|
||||
should_invoke = await self._verify_user(ctx)
|
||||
elif isinstance(next_state, dict):
|
||||
# NORMAL to PASSIVE_ALLOW; should we proceed as normal or transition?
|
||||
next_state = next_state[await self._verify_user(ctx)]
|
||||
|
||||
ctx.permission_state = next_state
|
||||
return should_invoke
|
||||
|
||||
async def _verify_user(self, ctx: "Context") -> bool:
|
||||
checks_pass = await self._verify_checks(ctx)
|
||||
if checks_pass is False:
|
||||
return False
|
||||
|
||||
if self.user_perms is not None:
|
||||
user_perms = ctx.channel.permissions_for(ctx.author)
|
||||
if user_perms.administrator or user_perms >= self.user_perms:
|
||||
return True
|
||||
|
||||
if self.privilege_level is not None:
|
||||
privilege_level = await PrivilegeLevel.from_ctx(ctx)
|
||||
if privilege_level >= self.privilege_level:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _get_rule_from_ctx(self, ctx: "Context") -> PermState:
|
||||
author = ctx.author
|
||||
guild = ctx.guild
|
||||
if ctx.guild is None:
|
||||
# We only check the user for DM channels
|
||||
rule = self._global_rules.get(author.id)
|
||||
if rule is not None:
|
||||
return rule
|
||||
return self.default_global_rule
|
||||
|
||||
rules_chain = [self._global_rules]
|
||||
guild_rules = self._guild_rules.get(ctx.guild.id)
|
||||
if guild_rules:
|
||||
rules_chain.append(guild_rules)
|
||||
|
||||
channels = []
|
||||
if author.voice is not None:
|
||||
channels.append(author.voice.channel)
|
||||
channels.append(ctx.channel)
|
||||
category = ctx.channel.category
|
||||
if category is not None:
|
||||
channels.append(category)
|
||||
|
||||
model_chain = [author, *channels, *author.roles, guild]
|
||||
|
||||
for rules in rules_chain:
|
||||
for model in model_chain:
|
||||
rule = rules.get(model.id)
|
||||
if rule is not None:
|
||||
return rule
|
||||
del model_chain[-1] # We don't check for the guild in guild rules
|
||||
|
||||
default_rule = self.get_default_guild_rule(guild.id)
|
||||
if default_rule is PermState.NORMAL:
|
||||
default_rule = self.default_global_rule
|
||||
return default_rule
|
||||
|
||||
async def _verify_checks(self, ctx: "Context") -> bool:
|
||||
if not self.checks:
|
||||
return True
|
||||
return await discord.utils.async_all(check(ctx) for check in self.checks)
|
||||
|
||||
@staticmethod
|
||||
def _get_perms_for(ctx: "Context", user: discord.abc.User) -> discord.Permissions:
|
||||
if ctx.guild is None:
|
||||
return DM_PERMS
|
||||
else:
|
||||
return ctx.channel.permissions_for(user)
|
||||
|
||||
@classmethod
|
||||
def _get_bot_perms(cls, ctx: "Context") -> discord.Permissions:
|
||||
return cls._get_perms_for(ctx, ctx.guild.me if ctx.guild else ctx.bot.user)
|
||||
|
||||
@staticmethod
|
||||
def _missing_perms(
|
||||
required: discord.Permissions, actual: discord.Permissions
|
||||
) -> discord.Permissions:
|
||||
# Explained in set theory terms:
|
||||
# Assuming R is the set of required permissions, and A is
|
||||
# the set of the user's permissions, the set of missing
|
||||
# permissions will be equal to R \ A, i.e. the relative
|
||||
# complement/difference of A with respect to R.
|
||||
relative_complement = required.value & ~actual.value
|
||||
return discord.Permissions(relative_complement)
|
||||
|
||||
@staticmethod
|
||||
def _member_as_user(member: discord.abc.User) -> discord.User:
|
||||
if isinstance(member, discord.Member):
|
||||
# noinspection PyProtectedMember
|
||||
return member._user
|
||||
return member
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"<Requires privilege_level={self.privilege_level!r} user_perms={self.user_perms!r} "
|
||||
f"bot_perms={self.bot_perms!r}>"
|
||||
)
|
||||
|
||||
|
||||
# check decorators
|
||||
|
||||
|
||||
def permissions_check(predicate: CheckPredicate):
|
||||
"""An overwriteable version of `discord.ext.commands.check`.
|
||||
|
||||
This has the same behaviour as `discord.ext.commands.check`,
|
||||
however this check can be ignored if the command is allowed
|
||||
through a permissions cog.
|
||||
"""
|
||||
|
||||
def decorator(func: "_CommandOrCoro") -> "_CommandOrCoro":
|
||||
if hasattr(func, "requires"):
|
||||
func.requires.checks.append(predicate)
|
||||
else:
|
||||
if not hasattr(func, "__requires_checks__"):
|
||||
func.__requires_checks__ = []
|
||||
# noinspection PyUnresolvedReferences
|
||||
func.__requires_checks__.append(predicate)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def bot_has_permissions(**perms: bool):
|
||||
"""Complain if the bot is missing permissions.
|
||||
|
||||
If the user tries to run the command, but the bot is missing the
|
||||
permissions, it will send a message describing which permissions
|
||||
are missing.
|
||||
|
||||
This check cannot be overridden by rules.
|
||||
"""
|
||||
|
||||
def decorator(func: "_CommandOrCoro") -> "_CommandOrCoro":
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
func.__requires_bot_perms__ = perms
|
||||
else:
|
||||
func.requires.bot_perms.update(**perms)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def has_permissions(**perms: bool):
|
||||
"""Restrict the command to users with these permissions.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return Requires.get_decorator(None, perms)
|
||||
|
||||
|
||||
def is_owner():
|
||||
"""Restrict the command to bot owners.
|
||||
|
||||
This check cannot be overridden by rules.
|
||||
"""
|
||||
return Requires.get_decorator(PrivilegeLevel.BOT_OWNER, {})
|
||||
|
||||
|
||||
def guildowner_or_permissions(**perms: bool):
|
||||
"""Restrict the command to the guild owner or users with these permissions.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return Requires.get_decorator(PrivilegeLevel.GUILD_OWNER, perms)
|
||||
|
||||
|
||||
def guildowner():
|
||||
"""Restrict the command to the guild owner.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return guildowner_or_permissions()
|
||||
|
||||
|
||||
def admin_or_permissions(**perms: bool):
|
||||
"""Restrict the command to users with the admin role or these permissions.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return Requires.get_decorator(PrivilegeLevel.ADMIN, perms)
|
||||
|
||||
|
||||
def admin():
|
||||
"""Restrict the command to users with the admin role.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return admin_or_permissions()
|
||||
|
||||
|
||||
def mod_or_permissions(**perms: bool):
|
||||
"""Restrict the command to users with the mod role or these permissions.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return Requires.get_decorator(PrivilegeLevel.MOD, perms)
|
||||
|
||||
|
||||
def mod():
|
||||
"""Restrict the command to users with the mod role.
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
return mod_or_permissions()
|
||||
|
||||
|
||||
class _IntKeyDict(Dict[int, _T]):
|
||||
"""Dict subclass which throws KeyError when a non-int key is used."""
|
||||
|
||||
def __getitem__(self, key: Any) -> _T:
|
||||
if not isinstance(key, int):
|
||||
raise TypeError("Keys must be of type `int`")
|
||||
return super().__getitem__(key)
|
||||
|
||||
def __setitem__(self, key: Any, value: _T) -> None:
|
||||
if not isinstance(key, int):
|
||||
raise TypeError("Keys must be of type `int`")
|
||||
return super().__setitem__(key, value)
|
||||
@@ -241,7 +241,7 @@ class CoreLogic:
|
||||
|
||||
|
||||
@i18n.cog_i18n(_)
|
||||
class Core(CoreLogic):
|
||||
class Core(commands.Cog, CoreLogic):
|
||||
"""Commands related to core functions"""
|
||||
|
||||
def __init__(self, bot):
|
||||
|
||||
@@ -25,10 +25,11 @@ _ = Translator("Dev", __file__)
|
||||
START_CODE_BLOCK_RE = re.compile(r"^((```py)(?=\s)|(```))")
|
||||
|
||||
|
||||
class Dev:
|
||||
class Dev(commands.Cog):
|
||||
"""Various development focused utilities."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._last_result = None
|
||||
self.sessions = set()
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
import traceback
|
||||
from datetime import timedelta
|
||||
from distutils.version import StrictVersion
|
||||
from typing import List
|
||||
|
||||
import aiohttp
|
||||
import discord
|
||||
@@ -14,7 +15,7 @@ from pkg_resources import DistributionNotFound
|
||||
|
||||
from . import __version__, commands
|
||||
from .data_manager import storage_type
|
||||
from .utils.chat_formatting import inline, bordered
|
||||
from .utils.chat_formatting import inline, bordered, humanize_list
|
||||
from .utils import fuzzy_command_search, format_fuzzy_results
|
||||
|
||||
log = logging.getLogger("red")
|
||||
@@ -67,6 +68,14 @@ def init_events(bot, cli_flags):
|
||||
packages.extend(cli_flags.load_cogs)
|
||||
|
||||
if packages:
|
||||
# Load permissions first, for security reasons
|
||||
try:
|
||||
packages.remove("permissions")
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
packages.insert(0, "permissions")
|
||||
|
||||
to_remove = []
|
||||
print("Loading packages...")
|
||||
for package in packages:
|
||||
@@ -227,6 +236,21 @@ def init_events(bot, cli_flags):
|
||||
await ctx.send(embed=await format_fuzzy_results(ctx, fuzzy_commands, embed=True))
|
||||
else:
|
||||
await ctx.send(await format_fuzzy_results(ctx, fuzzy_commands, embed=False))
|
||||
elif isinstance(error, commands.BotMissingPermissions):
|
||||
missing_perms: List[str] = []
|
||||
for perm, value in error.missing:
|
||||
if value is True:
|
||||
perm_name = '"' + perm.replace("_", " ").title() + '"'
|
||||
missing_perms.append(perm_name)
|
||||
if len(missing_perms) == 1:
|
||||
plural = ""
|
||||
else:
|
||||
plural = "s"
|
||||
await ctx.send(
|
||||
"I require the {perms} permission{plural} to execute that command.".format(
|
||||
perms=humanize_list(missing_perms), plural=plural
|
||||
)
|
||||
)
|
||||
elif isinstance(error, commands.CheckFailure):
|
||||
pass
|
||||
elif isinstance(error, commands.NoPrivateMessage):
|
||||
|
||||
@@ -3,7 +3,7 @@ from . import commands
|
||||
|
||||
|
||||
def init_global_checks(bot):
|
||||
@bot.check
|
||||
@bot.check_once
|
||||
async def global_perms(ctx):
|
||||
"""Check the user is/isn't globally whitelisted/blacklisted."""
|
||||
if await bot.is_owner(ctx.author):
|
||||
@@ -15,7 +15,7 @@ def init_global_checks(bot):
|
||||
|
||||
return ctx.author.id not in await bot.db.blacklist()
|
||||
|
||||
@bot.check
|
||||
@bot.check_once
|
||||
async def local_perms(ctx: commands.Context):
|
||||
"""Check the user is/isn't locally whitelisted/blacklisted."""
|
||||
if await bot.is_owner(ctx.author):
|
||||
@@ -33,7 +33,7 @@ def init_global_checks(bot):
|
||||
|
||||
return not any(i in local_blacklist for i in _ids)
|
||||
|
||||
@bot.check
|
||||
@bot.check_once
|
||||
async def bots(ctx):
|
||||
"""Check the user is not another bot."""
|
||||
return not ctx.author.bot
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import itertools
|
||||
from typing import Sequence, Iterator
|
||||
from typing import Sequence, Iterator, List
|
||||
from redbot.core.i18n import Translator
|
||||
|
||||
_ = Translator("UtilsChatFormatting", __file__)
|
||||
|
||||
|
||||
def error(text: str) -> str:
|
||||
@@ -317,3 +320,33 @@ def escape(text: str, *, mass_mentions: bool = False, formatting: bool = False)
|
||||
if formatting:
|
||||
text = text.replace("`", "\\`").replace("*", "\\*").replace("_", "\\_").replace("~", "\\~")
|
||||
return text
|
||||
|
||||
|
||||
def humanize_list(items: Sequence[str]):
|
||||
"""Get comma-separted list, with the last element joined with *and*.
|
||||
|
||||
This uses an Oxford comma, because without one, items containing
|
||||
the word *and* would make the output difficult to interpret.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
items : Sequence[str]
|
||||
The items of the list to join together.
|
||||
|
||||
Examples
|
||||
--------
|
||||
.. testsetup::
|
||||
|
||||
from redbot.core.utils.chat_formatting import humanize_list
|
||||
|
||||
.. doctest::
|
||||
|
||||
>>> humanize_list(['One', 'Two', 'Three'])
|
||||
'One, Two, and Three'
|
||||
>>> humanize_list(['One'])
|
||||
'One'
|
||||
|
||||
"""
|
||||
if len(items) == 1:
|
||||
return items[0]
|
||||
return ", ".join(items[:-1]) + _(", and ") + items[-1]
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
from typing import List, Iterable, Union
|
||||
from typing import List, Iterable, Union, TYPE_CHECKING, Dict
|
||||
|
||||
import discord
|
||||
|
||||
from redbot.core import Config
|
||||
from redbot.core.bot import Red
|
||||
if TYPE_CHECKING:
|
||||
from .. import Config
|
||||
from ..bot import Red
|
||||
from ..commands import Context
|
||||
|
||||
|
||||
async def mass_purge(messages: List[discord.Message], channel: discord.TextChannel):
|
||||
@@ -87,7 +89,7 @@ def get_audit_reason(author: discord.Member, reason: str = None):
|
||||
|
||||
|
||||
async def is_allowed_by_hierarchy(
|
||||
bot: Red, settings: Config, guild: discord.Guild, mod: discord.Member, user: discord.Member
|
||||
bot: "Red", settings: "Config", guild: discord.Guild, mod: discord.Member, user: discord.Member
|
||||
):
|
||||
if not await settings.guild(guild).respect_hierarchy():
|
||||
return True
|
||||
@@ -95,7 +97,9 @@ async def is_allowed_by_hierarchy(
|
||||
return mod.top_role.position > user.top_role.position or is_special
|
||||
|
||||
|
||||
async def is_mod_or_superior(bot: Red, obj: Union[discord.Message, discord.Member, discord.Role]):
|
||||
async def is_mod_or_superior(
|
||||
bot: "Red", obj: Union[discord.Message, discord.Member, discord.Role]
|
||||
):
|
||||
"""Check if an object has mod or superior permissions.
|
||||
|
||||
If a message is passed, its author's permissions are checked. If a role is
|
||||
@@ -179,7 +183,7 @@ def strfdelta(delta: timedelta):
|
||||
|
||||
|
||||
async def is_admin_or_superior(
|
||||
bot: Red, obj: Union[discord.Message, discord.Member, discord.Role]
|
||||
bot: "Red", obj: Union[discord.Message, discord.Member, discord.Role]
|
||||
):
|
||||
"""Same as `is_mod_or_superior` except for admin permissions.
|
||||
|
||||
@@ -225,3 +229,36 @@ async def is_admin_or_superior(
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
async def check_permissions(ctx: "Context", perms: Dict[str, bool]) -> bool:
|
||||
"""Check if the author has required permissions.
|
||||
|
||||
This will always return ``True`` if the author is a bot owner, or
|
||||
has the ``administrator`` permission. If ``perms`` is empty, this
|
||||
will only check if the user is a bot owner.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : Context
|
||||
The command invokation context to check.
|
||||
perms : Dict[str, bool]
|
||||
A dictionary mapping permissions to their required states.
|
||||
Valid permission names are those listed as properties of
|
||||
the `discord.Permissions` class.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
``True`` if the author has the required permissions.
|
||||
|
||||
"""
|
||||
if await ctx.bot.is_owner(ctx.author):
|
||||
return True
|
||||
elif not perms:
|
||||
return False
|
||||
resolved = ctx.channel.permissions_for(ctx.author)
|
||||
|
||||
return resolved.administrator or all(
|
||||
getattr(resolved, name, None) == value for name, value in perms.items()
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user