mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-22 02:37:57 -05:00
Merge branch 'V3/release/3.0.0' into V3/develop
# Conflicts: # redbot/cogs/audio/audio.py
This commit is contained in:
@@ -186,13 +186,23 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
async def is_admin(self, member: discord.Member):
|
||||
"""Checks if a member is an admin of their guild."""
|
||||
admin_role = await self.db.guild(member.guild).admin_role()
|
||||
return any(role.id == admin_role for role in member.roles)
|
||||
try:
|
||||
if any(role.id == admin_role for role in member.roles):
|
||||
return True
|
||||
except AttributeError: # someone passed a webhook to this
|
||||
pass
|
||||
return False
|
||||
|
||||
async def is_mod(self, member: discord.Member):
|
||||
"""Checks if a member is a mod or admin of their guild."""
|
||||
mod_role = await self.db.guild(member.guild).mod_role()
|
||||
admin_role = await self.db.guild(member.guild).admin_role()
|
||||
return any(role.id in (mod_role, admin_role) for role in member.roles)
|
||||
try:
|
||||
if any(role.id in (mod_role, admin_role) for role in member.roles):
|
||||
return True
|
||||
except AttributeError: # someone passed a webhook to this
|
||||
pass
|
||||
return False
|
||||
|
||||
async def get_context(self, message, *, cls=commands.Context):
|
||||
return await super().get_context(message, cls=cls)
|
||||
@@ -334,8 +344,14 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin):
|
||||
ids_to_check = [to_check.id]
|
||||
else:
|
||||
author = getattr(to_check, "author", to_check)
|
||||
ids_to_check = [r.id for r in author.roles]
|
||||
ids_to_check.append(author.id)
|
||||
try:
|
||||
ids_to_check = [r.id for r in author.roles]
|
||||
except AttributeError:
|
||||
# webhook messages are a user not member,
|
||||
# cheaper than isinstance
|
||||
return True # webhooks require significant permissions to enable.
|
||||
else:
|
||||
ids_to_check.append(author.id)
|
||||
|
||||
immune_ids = await self.db.guild(guild).autoimmune_ids()
|
||||
|
||||
|
||||
@@ -157,12 +157,31 @@ class Command(CogCommandMixin, commands.Command):
|
||||
cmd = cmd.parent
|
||||
return sorted(entries, key=lambda x: len(x.qualified_name), reverse=True)
|
||||
|
||||
async def can_run(self, ctx: "Context") -> bool:
|
||||
# noinspection PyMethodOverriding
|
||||
async def can_run(
|
||||
self,
|
||||
ctx: "Context",
|
||||
*,
|
||||
check_all_parents: bool = False,
|
||||
change_permission_state: bool = False,
|
||||
) -> bool:
|
||||
"""Check if this command can be run in the given context.
|
||||
|
||||
This function first checks if the command can be run using
|
||||
discord.py's method `discord.ext.commands.Command.can_run`,
|
||||
then will return the result of `Requires.verify`.
|
||||
|
||||
Keyword Arguments
|
||||
-----------------
|
||||
check_all_parents : bool
|
||||
If ``True``, this will check permissions for all of this
|
||||
command's parents and its cog as well as the command
|
||||
itself. Defaults to ``False``.
|
||||
change_permission_state : bool
|
||||
Whether or not the permission state should be changed as
|
||||
a result of this call. For most cases this should be
|
||||
``False``. Defaults to ``False``.
|
||||
|
||||
"""
|
||||
ret = await super().can_run(ctx)
|
||||
if ret is False:
|
||||
@@ -171,8 +190,21 @@ class Command(CogCommandMixin, commands.Command):
|
||||
# This is so contexts invoking other commands can be checked with
|
||||
# this command as well
|
||||
original_command = ctx.command
|
||||
original_state = ctx.permission_state
|
||||
ctx.command = self
|
||||
|
||||
if check_all_parents is True:
|
||||
# Since we're starting from the beginning, we should reset the state to normal
|
||||
ctx.permission_state = PermState.NORMAL
|
||||
for parent in reversed(self.parents):
|
||||
try:
|
||||
result = await parent.can_run(ctx, change_permission_state=True)
|
||||
except commands.CommandError:
|
||||
result = False
|
||||
|
||||
if result is False:
|
||||
return False
|
||||
|
||||
if self.parent is None and self.instance is not None:
|
||||
# For top-level commands, we need to check the cog's requires too
|
||||
ret = await self.instance.requires.verify(ctx)
|
||||
@@ -183,6 +215,17 @@ class Command(CogCommandMixin, commands.Command):
|
||||
return await self.requires.verify(ctx)
|
||||
finally:
|
||||
ctx.command = original_command
|
||||
if not change_permission_state:
|
||||
ctx.permission_state = original_state
|
||||
|
||||
async def _verify_checks(self, ctx):
|
||||
if not self.enabled:
|
||||
raise commands.DisabledCommand(f"{self.name} command is disabled")
|
||||
|
||||
if not (await self.can_run(ctx, change_permission_state=True)):
|
||||
raise commands.CheckFailure(
|
||||
f"The check functions for command {self.qualified_name} failed."
|
||||
)
|
||||
|
||||
async def do_conversion(
|
||||
self, ctx: "Context", converter, argument: str, param: inspect.Parameter
|
||||
@@ -238,7 +281,9 @@ class Command(CogCommandMixin, commands.Command):
|
||||
if cmd.hidden:
|
||||
return False
|
||||
try:
|
||||
can_run = await self.can_run(ctx)
|
||||
can_run = await self.can_run(
|
||||
ctx, check_all_parents=True, change_permission_state=False
|
||||
)
|
||||
except commands.CheckFailure:
|
||||
return False
|
||||
else:
|
||||
|
||||
@@ -281,12 +281,14 @@ class Requires:
|
||||
|
||||
if isinstance(user_perms, dict):
|
||||
self.user_perms: Optional[discord.Permissions] = discord.Permissions.none()
|
||||
_validate_perms_dict(user_perms)
|
||||
self.user_perms.update(**user_perms)
|
||||
else:
|
||||
self.user_perms = user_perms
|
||||
|
||||
if isinstance(bot_perms, dict):
|
||||
self.bot_perms: discord.Permissions = discord.Permissions.none()
|
||||
_validate_perms_dict(bot_perms)
|
||||
self.bot_perms.update(**bot_perms)
|
||||
else:
|
||||
self.bot_perms = bot_perms
|
||||
@@ -311,6 +313,7 @@ class Requires:
|
||||
if user_perms is None:
|
||||
func.requires.user_perms = None
|
||||
else:
|
||||
_validate_perms_dict(user_perms)
|
||||
func.requires.user_perms.update(**user_perms)
|
||||
return func
|
||||
|
||||
@@ -449,7 +452,20 @@ class Requires:
|
||||
should_invoke = await self._verify_user(ctx)
|
||||
elif isinstance(next_state, dict):
|
||||
# NORMAL to PASSIVE_ALLOW; should we proceed as normal or transition?
|
||||
next_state = next_state[await self._verify_user(ctx)]
|
||||
# We must check what would happen normally, if no explicit rules were set.
|
||||
default_rule = PermState.NORMAL
|
||||
if ctx.guild is not None:
|
||||
default_rule = self.get_default_guild_rule(guild_id=ctx.guild.id)
|
||||
if default_rule is PermState.NORMAL:
|
||||
default_rule = self.default_global_rule
|
||||
|
||||
if default_rule == PermState.ACTIVE_DENY:
|
||||
would_invoke = False
|
||||
elif default_rule == PermState.ACTIVE_ALLOW:
|
||||
would_invoke = True
|
||||
else:
|
||||
would_invoke = await self._verify_user(ctx)
|
||||
next_state = next_state[would_invoke]
|
||||
|
||||
ctx.permission_state = next_state
|
||||
return should_invoke
|
||||
@@ -588,6 +604,7 @@ def bot_has_permissions(**perms: bool):
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
func.__requires_bot_perms__ = perms
|
||||
else:
|
||||
_validate_perms_dict(perms)
|
||||
func.requires.bot_perms.update(**perms)
|
||||
return func
|
||||
|
||||
@@ -599,6 +616,8 @@ def has_permissions(**perms: bool):
|
||||
|
||||
This check can be overridden by rules.
|
||||
"""
|
||||
if perms is None:
|
||||
raise TypeError("Must provide at least one keyword argument to has_permissions")
|
||||
return Requires.get_decorator(None, perms)
|
||||
|
||||
|
||||
@@ -670,3 +689,20 @@ class _IntKeyDict(Dict[int, _T]):
|
||||
if not isinstance(key, int):
|
||||
raise TypeError("Keys must be of type `int`")
|
||||
return super().__setitem__(key, value)
|
||||
|
||||
|
||||
def _validate_perms_dict(perms: Dict[str, bool]) -> None:
|
||||
for perm, value in perms.items():
|
||||
try:
|
||||
attr = getattr(discord.Permissions, perm)
|
||||
except AttributeError:
|
||||
attr = None
|
||||
|
||||
if attr is None or not isinstance(attr, property):
|
||||
# We reject invalid permissions
|
||||
raise TypeError(f"Unknown permission name '{perm}'")
|
||||
|
||||
if value is not True:
|
||||
# We reject any permission not specified as 'True', since this is the only value which
|
||||
# makes practical sense.
|
||||
raise TypeError(f"Permission {perm} may only be specified as 'True', not {value}")
|
||||
|
||||
@@ -468,7 +468,7 @@ class Core(commands.Cog, CoreLogic):
|
||||
|
||||
pred = MessagePredicate.yes_or_no(ctx)
|
||||
try:
|
||||
await self.bot.wait_for("message", check=MessagePredicate.yes_or_no(ctx))
|
||||
await self.bot.wait_for("message", check=pred)
|
||||
except asyncio.TimeoutError:
|
||||
await ctx.send("Response timed out.")
|
||||
return
|
||||
@@ -1729,7 +1729,7 @@ class Core(commands.Cog, CoreLogic):
|
||||
await ctx.tick()
|
||||
|
||||
@commands.guild_only()
|
||||
@checks.guildowner_or_permissions(manage_server=True)
|
||||
@checks.guildowner_or_permissions(manage_guild=True)
|
||||
@commands.group(name="autoimmune")
|
||||
async def autoimmune_group(self, ctx: commands.Context):
|
||||
"""
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from copy import deepcopy
|
||||
import hashlib
|
||||
import shutil
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
|
||||
import appdirs
|
||||
import tempfile
|
||||
from discord.utils import deprecated
|
||||
|
||||
from . import commands
|
||||
from .json_io import JsonIO
|
||||
|
||||
__all__ = [
|
||||
@@ -153,124 +153,28 @@ def core_data_path() -> Path:
|
||||
return core_path.resolve()
|
||||
|
||||
|
||||
def _find_data_files(init_location: str) -> (Path, List[Path]):
|
||||
"""
|
||||
Discovers all files in the bundled data folder of an installed cog.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
init_location
|
||||
|
||||
Returns
|
||||
-------
|
||||
(pathlib.Path, list of pathlib.Path)
|
||||
"""
|
||||
init_file = Path(init_location)
|
||||
if not init_file.is_file():
|
||||
return []
|
||||
|
||||
package_folder = init_file.parent.resolve() / "data"
|
||||
if not package_folder.is_dir():
|
||||
return []
|
||||
|
||||
all_files = list(package_folder.rglob("*"))
|
||||
|
||||
return package_folder, [p.resolve() for p in all_files if p.is_file()]
|
||||
|
||||
|
||||
def _compare_and_copy(to_copy: List[Path], bundled_data_dir: Path, cog_data_dir: Path):
|
||||
"""
|
||||
Filters out files from ``to_copy`` that already exist, and are the
|
||||
same, in ``data_dir``. The files that are different are copied into
|
||||
``data_dir``.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
to_copy : list of pathlib.Path
|
||||
bundled_data_dir : pathlib.Path
|
||||
cog_data_dir : pathlib.Path
|
||||
"""
|
||||
|
||||
def hash_bytestr_iter(bytesiter, hasher, ashexstr=False):
|
||||
for block in bytesiter:
|
||||
hasher.update(block)
|
||||
return hasher.hexdigest() if ashexstr else hasher.digest()
|
||||
|
||||
def file_as_blockiter(afile, blocksize=65536):
|
||||
with afile:
|
||||
block = afile.read(blocksize)
|
||||
while len(block) > 0:
|
||||
yield block
|
||||
block = afile.read(blocksize)
|
||||
|
||||
lookup = {p: cog_data_dir.joinpath(p.relative_to(bundled_data_dir)) for p in to_copy}
|
||||
|
||||
for orig, poss_existing in lookup.items():
|
||||
if not poss_existing.is_file():
|
||||
poss_existing.parent.mkdir(exist_ok=True, parents=True)
|
||||
exists_checksum = None
|
||||
else:
|
||||
exists_checksum = hash_bytestr_iter(
|
||||
file_as_blockiter(poss_existing.open("rb")), hashlib.sha256()
|
||||
)
|
||||
|
||||
orig_checksum = ...
|
||||
if exists_checksum is not None:
|
||||
orig_checksum = hash_bytestr_iter(file_as_blockiter(orig.open("rb")), hashlib.sha256())
|
||||
|
||||
if exists_checksum != orig_checksum:
|
||||
shutil.copy(str(orig), str(poss_existing))
|
||||
log.debug("Copying {} to {}".format(orig, poss_existing))
|
||||
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
@deprecated("bundled_data_path() without calling this function")
|
||||
def load_bundled_data(cog_instance, init_location: str):
|
||||
pass
|
||||
|
||||
|
||||
def bundled_data_path(cog_instance: commands.Cog) -> Path:
|
||||
"""
|
||||
This function copies (and overwrites) data from the ``data/`` folder
|
||||
of the installed cog.
|
||||
Get the path to the "data" directory bundled with this cog.
|
||||
|
||||
The bundled data folder must be located alongside the ``.py`` file
|
||||
which contains the cog class.
|
||||
|
||||
.. important::
|
||||
|
||||
This function MUST be called from the ``setup()`` function of your
|
||||
cog.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> from redbot.core import data_manager
|
||||
>>>
|
||||
>>> def setup(bot):
|
||||
>>> cog = MyCog()
|
||||
>>> data_manager.load_bundled_data(cog, __file__)
|
||||
>>> bot.add_cog(cog)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cog_instance
|
||||
An instance of your cog class.
|
||||
init_location : str
|
||||
The ``__file__`` attribute of the file where your ``setup()``
|
||||
function exists.
|
||||
"""
|
||||
bundled_data_folder, to_copy = _find_data_files(init_location)
|
||||
|
||||
cog_data_folder = cog_data_path(cog_instance) / "bundled_data"
|
||||
|
||||
_compare_and_copy(to_copy, bundled_data_folder, cog_data_folder)
|
||||
|
||||
|
||||
def bundled_data_path(cog_instance) -> Path:
|
||||
"""
|
||||
The "data" directory that has been copied from installed cogs.
|
||||
|
||||
.. important::
|
||||
|
||||
You should *NEVER* write to this directory. Data manager will
|
||||
overwrite files in this directory each time `load_bundled_data`
|
||||
is called. You should instead write to the directory provided by
|
||||
`cog_data_path`.
|
||||
You should *NEVER* write to this directory.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cog_instance
|
||||
An instance of your cog. If calling from a command or method of
|
||||
your cog, this should be ``self``.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -280,10 +184,10 @@ def bundled_data_path(cog_instance) -> Path:
|
||||
Raises
|
||||
------
|
||||
FileNotFoundError
|
||||
If no bundled data folder exists or if it hasn't been loaded yet.
|
||||
"""
|
||||
If no bundled data folder exists.
|
||||
|
||||
bundled_path = cog_data_path(cog_instance) / "bundled_data"
|
||||
"""
|
||||
bundled_path = Path(inspect.getfile(cog_instance.__class__)).parent / "data"
|
||||
|
||||
if not bundled_path.is_dir():
|
||||
raise FileNotFoundError("No such directory {}".format(bundled_path))
|
||||
|
||||
@@ -23,6 +23,7 @@ discord.py 1.0.0a
|
||||
|
||||
This help formatter contains work by Rapptz (Danny) and SirThane#1780.
|
||||
"""
|
||||
import contextlib
|
||||
from collections import namedtuple
|
||||
from typing import List, Optional, Union
|
||||
|
||||
@@ -224,8 +225,8 @@ class Help(dpy_formatter.HelpFormatter):
|
||||
|
||||
return ret
|
||||
|
||||
async def format_help_for(self, ctx, command_or_bot, reason: str = None):
|
||||
"""Formats the help page and handles the actual heavy lifting of how ### WTF HAPPENED?
|
||||
async def format_help_for(self, ctx, command_or_bot, reason: str = ""):
|
||||
"""Formats the help page and handles the actual heavy lifting of how
|
||||
the help command looks like. To change the behaviour, override the
|
||||
:meth:`~.HelpFormatter.format` method.
|
||||
|
||||
@@ -244,10 +245,24 @@ class Help(dpy_formatter.HelpFormatter):
|
||||
"""
|
||||
self.context = ctx
|
||||
self.command = command_or_bot
|
||||
|
||||
# We want the permission state to be set as if the author had run the command he is
|
||||
# requesting help for. This is so the subcommands shown in the help menu correctly reflect
|
||||
# any permission rules set.
|
||||
if isinstance(self.command, commands.Command):
|
||||
with contextlib.suppress(commands.CommandError):
|
||||
await self.command.can_run(
|
||||
self.context, check_all_parents=True, change_permission_state=True
|
||||
)
|
||||
elif isinstance(self.command, commands.Cog):
|
||||
with contextlib.suppress(commands.CommandError):
|
||||
# Cog's don't have a `can_run` method, so we use the `Requires` object directly.
|
||||
await self.command.requires.verify(self.context)
|
||||
|
||||
emb = await self.format()
|
||||
|
||||
if reason:
|
||||
emb["embed"]["title"] = "{0}".format(reason)
|
||||
emb["embed"]["title"] = reason
|
||||
|
||||
ret = []
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from uuid import uuid4
|
||||
|
||||
# This is basically our old DataIO and just a base for much more elaborate classes
|
||||
@@ -69,7 +70,11 @@ class JsonIO:
|
||||
|
||||
async def _threadsafe_save_json(self, data, settings=PRETTY):
|
||||
loop = asyncio.get_event_loop()
|
||||
func = functools.partial(self._save_json, data, settings)
|
||||
# the deepcopy is needed here. otherwise,
|
||||
# the dict can change during serialization
|
||||
# and this will break the encoder.
|
||||
data_copy = deepcopy(data)
|
||||
func = functools.partial(self._save_json, data_copy, settings)
|
||||
async with self._lock:
|
||||
await loop.run_in_executor(None, func)
|
||||
|
||||
|
||||
@@ -666,29 +666,30 @@ async def register_casetypes(new_types: List[dict]) -> List[CaseType]:
|
||||
return type_list
|
||||
|
||||
|
||||
async def get_modlog_channel(guild: discord.Guild) -> Union[discord.TextChannel, None]:
|
||||
async def get_modlog_channel(guild: discord.Guild) -> discord.TextChannel:
|
||||
"""
|
||||
Get the current modlog channel
|
||||
Get the current modlog channel.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
guild: `discord.Guild`
|
||||
The guild to get the modlog channel for
|
||||
The guild to get the modlog channel for.
|
||||
|
||||
Returns
|
||||
-------
|
||||
`discord.TextChannel` or `None`
|
||||
The channel object representing the modlog channel
|
||||
`discord.TextChannel`
|
||||
The channel object representing the modlog channel.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
If the modlog channel is not found
|
||||
If the modlog channel is not found.
|
||||
|
||||
"""
|
||||
if hasattr(guild, "get_channel"):
|
||||
channel = guild.get_channel(await _conf.guild(guild).mod_log())
|
||||
else:
|
||||
# For unit tests only
|
||||
channel = await _conf.guild(guild).mod_log()
|
||||
if channel is None:
|
||||
raise RuntimeError("Failed to get the mod log channel!")
|
||||
|
||||
Reference in New Issue
Block a user