From 1cb43b11a129b6b5f9c33088a4d4f4a8e6abee61 Mon Sep 17 00:00:00 2001 From: DiscordLiz <47602820+DiscordLiz@users.noreply.github.com> Date: Tue, 14 Jan 2020 22:17:54 -0500 Subject: [PATCH] Some old work and some new (#3362) * Some old work, some new * c:style * remove wrong version --- docs/framework_utils.rst | 6 + redbot/cogs/mod/kickban.py | 44 ++- redbot/cogs/mod/mod.py | 2 - redbot/cogs/mod/mutes.py | 465 --------------------------- redbot/cogs/mutes/__init__.py | 5 + redbot/cogs/mutes/errors.py | 21 ++ redbot/cogs/mutes/mutes.py | 410 +++++++++++++++++++++++ redbot/cogs/mutes/utils.py | 53 +++ redbot/core/utils/discord_helpers.py | 152 +++++++++ 9 files changed, 690 insertions(+), 468 deletions(-) delete mode 100644 redbot/cogs/mod/mutes.py create mode 100644 redbot/cogs/mutes/__init__.py create mode 100644 redbot/cogs/mutes/errors.py create mode 100644 redbot/cogs/mutes/mutes.py create mode 100644 redbot/cogs/mutes/utils.py create mode 100644 redbot/core/utils/discord_helpers.py diff --git a/docs/framework_utils.rst b/docs/framework_utils.rst index 468d880f0..186ff44ee 100644 --- a/docs/framework_utils.rst +++ b/docs/framework_utils.rst @@ -51,3 +51,9 @@ Common Filters .. automodule:: redbot.core.utils.common_filters :members: + +Discord Helper Classes +====================== + +.. automodule:: redbot.core.utils.discord_helpers + :members: diff --git a/redbot/cogs/mod/kickban.py b/redbot/cogs/mod/kickban.py index 37182518c..a41ce2948 100644 --- a/redbot/cogs/mod/kickban.py +++ b/redbot/cogs/mod/kickban.py @@ -7,7 +7,7 @@ from typing import cast, Optional, Union import discord from redbot.core import commands, i18n, checks, modlog -from redbot.core.utils.chat_formatting import pagify, humanize_number +from redbot.core.utils.chat_formatting import pagify, humanize_number, format_perms_list from redbot.core.utils.mod import is_allowed_by_hierarchy, get_audit_reason from .abc import MixinMeta from .converters import RawUserIds @@ -21,6 +21,48 @@ class KickBanMixin(MixinMeta): Kick and ban commands and tasks go here. """ + @staticmethod + async def _voice_perm_check( + ctx: commands.Context, user_voice_state: Optional[discord.VoiceState], **perms: bool + ) -> bool: + """Check if the bot and user have sufficient permissions for voicebans. + + This also verifies that the user's voice state and connected + channel are not ``None``. + + Returns + ------- + bool + ``True`` if the permissions are sufficient and the user has + a valid voice state. + + """ + if user_voice_state is None or user_voice_state.channel is None: + await ctx.send(_("That user is not in a voice channel.")) + return False + voice_channel: discord.VoiceChannel = user_voice_state.channel + required_perms = discord.Permissions() + required_perms.update(**perms) + if not voice_channel.permissions_for(ctx.me) >= required_perms: + await ctx.send( + _("I require the {perms} permission(s) in that user's channel to do that.").format( + perms=format_perms_list(required_perms) + ) + ) + return False + if ( + ctx.permission_state is commands.PermState.NORMAL + and not voice_channel.permissions_for(ctx.author) >= required_perms + ): + await ctx.send( + _( + "You must have the {perms} permission(s) in that user's channel to use this " + "command." + ).format(perms=format_perms_list(required_perms)) + ) + return False + return True + @staticmethod async def get_invite_for_reinvite(ctx: commands.Context, max_age: int = 86400): """Handles the reinvite logic for getting an invite diff --git a/redbot/cogs/mod/mod.py b/redbot/cogs/mod/mod.py index 95ad52baa..4adfb3900 100644 --- a/redbot/cogs/mod/mod.py +++ b/redbot/cogs/mod/mod.py @@ -10,7 +10,6 @@ from .casetypes import CASETYPES from .events import Events from .kickban import KickBanMixin from .movetocore import MoveToCore -from .mutes import MuteMixin from .names import ModInfo from .slowmode import Slowmode from .settings import ModSettings @@ -35,7 +34,6 @@ class Mod( Events, KickBanMixin, MoveToCore, - MuteMixin, ModInfo, Slowmode, commands.Cog, diff --git a/redbot/cogs/mod/mutes.py b/redbot/cogs/mod/mutes.py deleted file mode 100644 index 5ee6126cc..000000000 --- a/redbot/cogs/mod/mutes.py +++ /dev/null @@ -1,465 +0,0 @@ -import asyncio -from typing import cast, Optional - -import discord -from redbot.core import commands, checks, i18n, modlog -from redbot.core.utils.chat_formatting import format_perms_list -from redbot.core.utils.mod import get_audit_reason, is_allowed_by_hierarchy -from .abc import MixinMeta - -T_ = i18n.Translator("Mod", __file__) - -_ = lambda s: s -mute_unmute_issues = { - "already_muted": _("That user can't send messages in this channel."), - "already_unmuted": _("That user isn't muted in this channel."), - "hierarchy_problem": _( - "I cannot let you do that. You are not higher than the user in the role hierarchy." - ), - "is_admin": _("That user cannot be muted, as they have the Administrator permission."), - "permissions_issue": _( - "Failed to mute user. I need the manage roles " - "permission and the user I'm muting must be " - "lower than myself in the role hierarchy." - ), -} -_ = T_ - - -class MuteMixin(MixinMeta): - """ - Stuff for mutes goes here - """ - - @staticmethod - async def _voice_perm_check( - ctx: commands.Context, user_voice_state: Optional[discord.VoiceState], **perms: bool - ) -> bool: - """Check if the bot and user have sufficient permissions for voicebans. - - This also verifies that the user's voice state and connected - channel are not ``None``. - - Returns - ------- - bool - ``True`` if the permissions are sufficient and the user has - a valid voice state. - - """ - if user_voice_state is None or user_voice_state.channel is None: - await ctx.send(_("That user is not in a voice channel.")) - return False - voice_channel: discord.VoiceChannel = user_voice_state.channel - required_perms = discord.Permissions() - required_perms.update(**perms) - if not voice_channel.permissions_for(ctx.me) >= required_perms: - await ctx.send( - _("I require the {perms} permission(s) in that user's channel to do that.").format( - perms=format_perms_list(required_perms) - ) - ) - return False - if ( - ctx.permission_state is commands.PermState.NORMAL - and not voice_channel.permissions_for(ctx.author) >= required_perms - ): - await ctx.send( - _( - "You must have the {perms} permission(s) in that user's channel to use this " - "command." - ).format(perms=format_perms_list(required_perms)) - ) - return False - return True - - @commands.command() - @commands.guild_only() - @checks.admin_or_permissions(mute_members=True, deafen_members=True) - async def voiceunban(self, ctx: commands.Context, user: discord.Member, *, reason: str = None): - """Unban a user from speaking and listening in the server's voice channels.""" - user_voice_state = user.voice - if ( - await self._voice_perm_check( - ctx, user_voice_state, deafen_members=True, mute_members=True - ) - is False - ): - return - needs_unmute = True if user_voice_state.mute else False - needs_undeafen = True if user_voice_state.deaf else False - audit_reason = get_audit_reason(ctx.author, reason) - if needs_unmute and needs_undeafen: - await user.edit(mute=False, deafen=False, reason=audit_reason) - elif needs_unmute: - await user.edit(mute=False, reason=audit_reason) - elif needs_undeafen: - await user.edit(deafen=False, reason=audit_reason) - else: - await ctx.send(_("That user isn't muted or deafened by the server!")) - return - - guild = ctx.guild - author = ctx.author - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "voiceunban", - user, - author, - reason, - until=None, - channel=None, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send(_("User is now allowed to speak and listen in voice channels")) - - @commands.command() - @commands.guild_only() - @checks.admin_or_permissions(mute_members=True, deafen_members=True) - async def voiceban(self, ctx: commands.Context, user: discord.Member, *, reason: str = None): - """Ban a user from speaking and listening in the server's voice channels.""" - user_voice_state: discord.VoiceState = user.voice - if ( - await self._voice_perm_check( - ctx, user_voice_state, deafen_members=True, mute_members=True - ) - is False - ): - return - needs_mute = True if user_voice_state.mute is False else False - needs_deafen = True if user_voice_state.deaf is False else False - audit_reason = get_audit_reason(ctx.author, reason) - author = ctx.author - guild = ctx.guild - if needs_mute and needs_deafen: - await user.edit(mute=True, deafen=True, reason=audit_reason) - elif needs_mute: - await user.edit(mute=True, reason=audit_reason) - elif needs_deafen: - await user.edit(deafen=True, reason=audit_reason) - else: - await ctx.send(_("That user is already muted and deafened server-wide!")) - return - - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "voiceban", - user, - author, - reason, - until=None, - channel=None, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send(_("User has been banned from speaking or listening in voice channels")) - - @commands.group() - @commands.guild_only() - @checks.mod_or_permissions(manage_channels=True) - async def mute(self, ctx: commands.Context): - """Mute users.""" - pass - - @mute.command(name="voice") - @commands.guild_only() - async def voice_mute(self, ctx: commands.Context, user: discord.Member, *, reason: str = None): - """Mute a user in their current voice channel.""" - user_voice_state = user.voice - if ( - await self._voice_perm_check( - ctx, user_voice_state, mute_members=True, manage_channels=True - ) - is False - ): - return - guild = ctx.guild - author = ctx.author - channel = user_voice_state.channel - audit_reason = get_audit_reason(author, reason) - - success, issue = await self.mute_user(guild, channel, author, user, audit_reason) - - if success: - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "vmute", - user, - author, - reason, - until=None, - channel=channel, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send( - _("Muted {user} in channel {channel.name}").format(user=user, channel=channel) - ) - else: - await ctx.send(issue) - - @mute.command(name="channel") - @commands.guild_only() - @commands.bot_has_permissions(manage_roles=True) - @checks.mod_or_permissions(administrator=True) - async def channel_mute( - self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ): - """Mute a user in the current text channel.""" - author = ctx.message.author - channel = ctx.message.channel - guild = ctx.guild - audit_reason = get_audit_reason(author, reason) - - success, issue = await self.mute_user(guild, channel, author, user, audit_reason) - - if success: - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "cmute", - user, - author, - reason, - until=None, - channel=channel, - ) - except RuntimeError as e: - await ctx.send(e) - await channel.send(_("User has been muted in this channel.")) - else: - await channel.send(issue) - - @mute.command(name="server", aliases=["guild"]) - @commands.guild_only() - @commands.bot_has_permissions(manage_roles=True) - @checks.mod_or_permissions(administrator=True) - async def guild_mute(self, ctx: commands.Context, user: discord.Member, *, reason: str = None): - """Mutes user in the server""" - author = ctx.message.author - guild = ctx.guild - audit_reason = get_audit_reason(author, reason) - - mute_success = [] - for channel in guild.channels: - success, issue = await self.mute_user(guild, channel, author, user, audit_reason) - mute_success.append((success, issue)) - await asyncio.sleep(0.1) - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "smute", - user, - author, - reason, - until=None, - channel=None, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send(_("User has been muted in this server.")) - - @commands.group() - @commands.guild_only() - @commands.bot_has_permissions(manage_roles=True) - @checks.mod_or_permissions(manage_channels=True) - async def unmute(self, ctx: commands.Context): - """Unmute users.""" - pass - - @unmute.command(name="voice") - @commands.guild_only() - async def unmute_voice( - self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ): - """Unmute a user in their current voice channel.""" - user_voice_state = user.voice - if ( - await self._voice_perm_check( - ctx, user_voice_state, mute_members=True, manage_channels=True - ) - is False - ): - return - guild = ctx.guild - author = ctx.author - channel = user_voice_state.channel - audit_reason = get_audit_reason(author, reason) - - success, message = await self.unmute_user(guild, channel, author, user, audit_reason) - - if success: - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "vunmute", - user, - author, - reason, - until=None, - channel=channel, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send( - _("Unmuted {user} in channel {channel.name}").format(user=user, channel=channel) - ) - else: - await ctx.send(_("Unmute failed. Reason: {}").format(message)) - - @checks.mod_or_permissions(administrator=True) - @unmute.command(name="channel") - @commands.bot_has_permissions(manage_roles=True) - @commands.guild_only() - async def unmute_channel( - self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ): - """Unmute a user in this channel.""" - channel = ctx.channel - author = ctx.author - guild = ctx.guild - audit_reason = get_audit_reason(author, reason) - - success, message = await self.unmute_user(guild, channel, author, user, audit_reason) - - if success: - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "cunmute", - user, - author, - reason, - until=None, - channel=channel, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send(_("User unmuted in this channel.")) - else: - await ctx.send(_("Unmute failed. Reason: {}").format(message)) - - @checks.mod_or_permissions(administrator=True) - @unmute.command(name="server", aliases=["guild"]) - @commands.bot_has_permissions(manage_roles=True) - @commands.guild_only() - async def unmute_guild( - self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ): - """Unmute a user in this server.""" - guild = ctx.guild - author = ctx.author - audit_reason = get_audit_reason(author, reason) - - unmute_success = [] - for channel in guild.channels: - success, message = await self.unmute_user(guild, channel, author, user, audit_reason) - unmute_success.append((success, message)) - await asyncio.sleep(0.1) - try: - await modlog.create_case( - self.bot, - guild, - ctx.message.created_at, - "sunmute", - user, - author, - reason, - until=None, - ) - except RuntimeError as e: - await ctx.send(e) - await ctx.send(_("User has been unmuted in this server.")) - - async def mute_user( - self, - guild: discord.Guild, - channel: discord.abc.GuildChannel, - author: discord.Member, - user: discord.Member, - reason: str, - ) -> (bool, str): - """Mutes the specified user in the specified channel""" - overwrites = channel.overwrites_for(user) - permissions = channel.permissions_for(user) - - if permissions.administrator: - return False, _(mute_unmute_issues["is_admin"]) - - new_overs = {} - if not isinstance(channel, discord.TextChannel): - new_overs.update(speak=False) - if not isinstance(channel, discord.VoiceChannel): - new_overs.update(send_messages=False, add_reactions=False) - - if all(getattr(permissions, p) is False for p in new_overs.keys()): - return False, _(mute_unmute_issues["already_muted"]) - - elif not await is_allowed_by_hierarchy(self.bot, self.settings, guild, author, user): - return False, _(mute_unmute_issues["hierarchy_problem"]) - - old_overs = {k: getattr(overwrites, k) for k in new_overs} - overwrites.update(**new_overs) - try: - await channel.set_permissions(user, overwrite=overwrites, reason=reason) - except discord.Forbidden: - return False, _(mute_unmute_issues["permissions_issue"]) - else: - await self.settings.member(user).set_raw( - "perms_cache", str(channel.id), value=old_overs - ) - return True, None - - async def unmute_user( - self, - guild: discord.Guild, - channel: discord.abc.GuildChannel, - author: discord.Member, - user: discord.Member, - reason: str, - ) -> (bool, str): - overwrites = channel.overwrites_for(user) - perms_cache = await self.settings.member(user).perms_cache() - - if channel.id in perms_cache: - old_values = perms_cache[channel.id] - else: - old_values = {"send_messages": None, "add_reactions": None, "speak": None} - - if all(getattr(overwrites, k) == v for k, v in old_values.items()): - return False, _(mute_unmute_issues["already_unmuted"]) - - elif not await is_allowed_by_hierarchy(self.bot, self.settings, guild, author, user): - return False, _(mute_unmute_issues["hierarchy_problem"]) - - overwrites.update(**old_values) - try: - if overwrites.is_empty(): - await channel.set_permissions( - user, overwrite=cast(discord.PermissionOverwrite, None), reason=reason - ) - else: - await channel.set_permissions(user, overwrite=overwrites, reason=reason) - except discord.Forbidden: - return False, _(mute_unmute_issues["permissions_issue"]) - else: - await self.settings.member(user).clear_raw("perms_cache", str(channel.id)) - return True, None diff --git a/redbot/cogs/mutes/__init__.py b/redbot/cogs/mutes/__init__.py new file mode 100644 index 000000000..a2e0de2f6 --- /dev/null +++ b/redbot/cogs/mutes/__init__.py @@ -0,0 +1,5 @@ +from .mutes import Mutes + + +def setup(bot): + bot.add_cog(Mutes(bot)) diff --git a/redbot/cogs/mutes/errors.py b/redbot/cogs/mutes/errors.py new file mode 100644 index 000000000..a927d291b --- /dev/null +++ b/redbot/cogs/mutes/errors.py @@ -0,0 +1,21 @@ +class ControlFlowException(Exception): + """ + The base exception for any exceptions used solely for control flow + If this or any subclass of this ever propogates, something has gone wrong. + """ + + pass + + +class NoChangeError(ControlFlowException): + pass + + +class PermError(ControlFlowException): + """ + An error to be raised when a permission issue is detected prior to an api call being made + """ + + def __init__(self, friendly_error=None, *args): + self.friendly_error = friendly_error + super().__init__(*args) diff --git a/redbot/cogs/mutes/mutes.py b/redbot/cogs/mutes/mutes.py new file mode 100644 index 000000000..f810a2087 --- /dev/null +++ b/redbot/cogs/mutes/mutes.py @@ -0,0 +1,410 @@ +from __future__ import annotations + +import asyncio +import logging +from datetime import timedelta, datetime +from typing import Awaitable, Dict, NamedTuple, Optional, Tuple, Union, no_type_check + +import discord + +from redbot.core import commands, checks, modlog +from redbot.core.commands import TimedeltaConverter +from redbot.core.config import Config +from redbot.core.i18n import Translator, cog_i18n +from redbot.core.utils.discord_helpers import OverwriteDiff +from redbot.core.data_manager import cog_data_path +from redbot.core.utils.dbtools import APSWConnectionWrapper as Connection + +from . import utils +from .errors import NoChangeError, PermError + +TaskDict = Dict[Tuple[int, int], asyncio.Task] + +_ = Translator("Mutes", __file__) +log = logging.getLogger("red.mutes") + + +@cog_i18n(_) +class Mutes(commands.Cog): + """ + A cog to mute users with. + """ + + def __init__(self, bot): + self.bot = bot + self.conn = Connection(cog_data_path(self) / "mutes.db") + self.config = Config.get_conf(self, identifier=240961564503441410) + self.config.register_guild( + mute_deny_text=2112, # send, react + mute_deny_voice=2097152, # speak + excluded_channel_ids=[], + ) + self._unmute_task = asyncio.create_task(self.unmute_loop()) + self._task_queue = asyncio.Queue() + self._server_unmute_tasks: TaskDict = {} + self._channel_unmute_tasks: TaskDict = {} + self._ready = asyncio.Event() + self.bot.loop.create_task(self._cog_init()) + + async def _cog_init(self): + with self.conn.with_cursor() as cursor: + cursor.execute("""PRAGMA journal_mode=wal""") + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS mutes( + user_id INTEGER NOT NULL, + channel_id INTEGER NOT NULL, + guild_id INTEGER NOT NULL, + allows_added INTEGER, + allows_removed INTEGER, + denies_added INTEGER, + denies_removed INTEGER, + expires_at INTEGER, + PRIMARY KEY (user_id, channel_id) + ); + """ + ) + self._ready.set() + + async def cog_before_invoke(self): + await self._ready.wait() + + def cog_unload(self): + self.unmute_task.cancel() + for task in self._server_unmute_tasks.values(): + task.cancel() + for task in self._channel_unmute_tasks.values(): + task.cancel() + + def _clean_task_dict(self, task_dict): + + is_debug = log.getEffectiveLevel() <= logging.DEBUG + + for k in list(task_dict.keys()): + task = task_dict[k] + + if task.canceled(): + task_dict.pop(k, None) + continue + + if task.done(): + try: + r = task.result() + except Exception: + # Log exception info for dead tasks, but only while debugging. + if is_debug: + log.exception("Dead server unmute task.") + task_dict.pop(k, None) + + async def unmute_loop(self): + await self.bot.wait_until_ready() + while True: + async with self._task_lock: + self._clean_task_dict(self._server_unmute_tasks) + self._clean_task_dict(self._channel_unmute_tasks) + await self._schedule_unmutes(300) + await asyncio.sleep(300) + + async def _schedule_unmutes(self, schedule_by_seconds: int = 300): + """ + Schedules unmuting. + Mutes get scheduled as tasks so that mute extensions or changes to make a mute + permanent can have a scheduled mute be canceled. + """ + raise NotImplementedError() # TODO + + async def _cancel_channel_mute_delayed(self, *, delay: float, channel_id: int, member_id: int): + """ + After a delay, attempt to unmute someone + """ + raise NotImplementedError() # TODO + + async def _cancel_server_mute_delayed(self, *, delay: float, guild_id: int, member_id: int): + """ + After a delay, attempt to unmute someone. + """ + await asyncio.sleep(delay) + + guild = self.bot.get_guild(guild_id) + if not guild: + return + + member = guild.get_member(member_id) + + if not member: # Still clear this to avoid re-muting on-join after expiration. + pass + + # TODO + + @staticmethod + async def channel_mute_with_diff( + *, + channel: discord.abc.GuildChannel, + target: Union[discord.Role, discord.Member], + deny_value: int, + reason: Optional[str] = None, + ) -> OverwriteDiff: + """ + Parameters + ---------- + channel : discord.abc.GuildChannel + target : Union[discord.Role, discord.Member] + deny_value : int + The permissions values which should be denied. + reason : str + + Returns + ------- + OverwriteDiff + + Raises + ------ + discord.Forbidden + see `discord.abc.GuildChannel.set_permissions` + discord.NotFound + see `discord.abc.GuildChannel.set_permissions` + discord.HTTPException + see `discord.abc.GuildChannel.set_permissions` + NoChangeError + the edit was aborted due to no change + in permissions between initial and requested + """ + diff_to_apply = OverwriteDiff(denies_added=deny_value) + start = channel.overwrites_for(target) + new_overwrite = start + diff_to_apply + result_diff = OverwriteDiff.from_overwrites(before=start, after=new_overwrite) + + if not result_diff: + raise NoChangeError() from None + + await channel.set_permissions(target, overwrite=new_overwrite, reason=reason) + return result_diff + + @staticmethod + async def channel_unmute_from_diff( + *, + channel: discord.abc.GuildChannel, + target: Union[discord.Role, discord.Member], + diff: OverwriteDiff, + reason: Optional[str] = None, + ): + """ + Parameters + ---------- + channel : discord.abc.GuildChannel + target : Union[discord.Role, discord.Member] + diff : OverwriteDiff + The recorded difference from a prior mute to undo + reason : str + + Raises + ------ + discord.Forbidden + see `discord.abc.GuildChannel.set_permissions` + discord.NotFound + see `discord.abc.GuildChannel.set_permissions` + discord.HTTPException + see `discord.abc.GuildChannel.set_permissions` + NoChangeError + the edit was aborted due to no change + in permissions between initial and requested + """ + + start = channel.overwrites_for(target) + new_overwrite = start - diff + + if start == new_overwrite: + raise NoChangeError() + + await channel.set_permissions(target, overwrite=new_overwrite, reason=reason) + + async def do_command_server_mute( + self, + *, + ctx: commands.Context, + target: discord.Member, + duration: Optional[timedelta] = None, + reason: str, + ): + """ + This avoids duplicated logic with the option to use + the command group as one of the commands itself. + + Parameters + ---------- + ctx : commands.Context + The context the command was invoked in + target : discord.Member + The person to mute + duration : Optional[timedelta] + If provided, the amount of time to mute the user for + reason : str + The reason for the mute + + """ + raise NotImplementedError() # TODO + + async def apply_server_mute( + self, + *, + target: Optional[discord.Member] = None, + mod: discord.Member, + duration: Optional[timedelta], + reason: Optional[str] = None, + target_id: Optional[int] = None, + ): + """ + Applies a mute server wide + + Parameters + ---------- + target : Optional[discord.Member] + The member to be muted. This can only be omitted if ``target_id`` is supplied. + target_id : Optional[int] + The member id to mute. This can only be omitted if ``target`` is supplied. + mod : discord.Member + The responisble moderator + duration : Optional[timedelta] + If provided, the mute is considered temporary, and should be scheduled + for unmute after this period of time. + reason : Optional[str] + If provided, the reason for muting a user. + + This should be the reason from the moderator's perspective. + All formatting should take place here. + This should be less than 900 characters long. + Longer reasons will be truncated. + + Returns + ------- + ServerMuteResults + A class which contains the mute results + and some helpers for providing them to users. + + Raises + ------ + NoChangeError + If the server mute would result in zero changes. + ValueError + Raised if not given a target or target id, or if the target is not in the guild + PermError + Raised if we detect an invalid target or bot permissions. + This error will contain a user-friendly error message. + discord.Forbidden + This will only be raised for 2FA related forbiddens, + or if the bot's allowed permissions change mid operation. + discord.HTTPException + Sometimes the API gives these back without a reason. + """ + raise NotImplementedError() # TODO + + async def do_command_server_unmute( + self, *, ctx: commands.Context, target: discord.Member, reason: str + ): + """ + All actual command logic. + """ + raise NotImplementedError() # TODO + + async def do_command_channel_mute( + self, + *, + ctx: commands.Context, + target: discord.Member, + channel: discord.abc.GuildChannel, + duration: Optional[timedelta] = None, + reason: str, + ): + """ + All actual command logic. + """ + + async def do_command_channel_unmute( + self, + *, + ctx: commands.Context, + target: discord.Member, + channel: discord.abc.GuildChannel, + reason: str, + ): + """ + All actual command logic. + """ + raise NotImplementedError() # TODO + + @checks.admin_or_permissions(manage_guild=True) + @commands.group() + async def _muteset(self, ctx: commands.Context): + """ + Allows configuring [botname]'s mute behavior. + """ + pass + + @checks.mod() + @commands.group(name="mute") + @no_type_check + async def mute_group(self, ctx): + """ + Mutes users. + """ + pass + + @checks.mod() + @commands.group(name="tempmute") + @no_type_check + async def tempmute_group( + self, + ctx, + target: discord.Member = None, + duration: TimedeltaConverter = None, + *, + reason: str = None, + ): + """ + Mutes users, for some amount of time. + """ + pass + + @checks.mod() + @mute_group.command(name="channel") + @no_type_check + async def mute_channel(self, ctx, target: discord.Member, *, reason: str = ""): + """ + Mutes a user in the current channel. + """ + await self.do_command_channel_mute( + ctx=ctx, target=target, reason=reason, channel=ctx.channel, duration=None + ) + + @checks.mod() + @mute_group.command(name="server", aliases=["guild"]) + @no_type_check + async def mute_server(self, ctx, target: discord.Member, *, reason: str = ""): + """ + Mutes a user in the current server. + """ + await self.do_command_server_mute(ctx=ctx, target=target, reason=reason, duration=None) + + @checks.mod() + @tempmute_group.command(name="channel") + @no_type_check + async def tempmute_channel( + self, ctx, target: discord.Member, duration: TimedeltaConverter, *, reason: str = "" + ): + """ + Mutes a user in the current channel. + """ + await self.do_command_channel_mute( + ctx=ctx, target=target, reason=reason, channel=ctx.channel, duration=duration + ) + + @checks.mod() + @tempmute_group.command(name="server", aliases=["guild"]) + @no_type_check + async def tempmute_server( + self, ctx, target: discord.Member, duration: TimedeltaConverter, *, reason: str = "" + ): + """ + Mutes a user in the current server. + """ + await self.do_command_server_mute(ctx=ctx, target=target, reason=reason, duration=duration) diff --git a/redbot/cogs/mutes/utils.py b/redbot/cogs/mutes/utils.py new file mode 100644 index 000000000..2a7a7b374 --- /dev/null +++ b/redbot/cogs/mutes/utils.py @@ -0,0 +1,53 @@ +import discord +from redbot.core.i18n import Translator + +from .errors import PermError + +_ = Translator("Mutes", __file__) + + +def ngettext(singular: str, plural: str, count: int, **fmt_kwargs) -> str: + """ + This isn't a full ngettext. + + Replace this with babel when Red can use that. + """ + return singular.format(**fmt_kwargs) if count == 1 else plural.format(**fmt_kwargs) + + +def hierarchy_check(*, mod: discord.Member, target: discord.Member): + """ + Checks that things are hierarchy safe. + + This does not check the bot can modify permissions. + This is assumed to be checked prior to command invocation. + + Parameters + ----------- + mod : discord.Member + The responsible moderator + target : discord.Member + The target of a mute + + Raises + ------ + PermError + Any of: + - The target is above either the mod or bot. + - The target had the administrator perm + - The target is the guild owner + This error will contain a user facing error message. + """ + if target == target.guild.owner: + raise PermError(friendly_error=_("You can't mute the owner of a guild.")) + + if target.guild_permissions.administrator: + raise PermError( + friendly_error=_("You can't mute someone with the administrator permission.") + ) + + if target.top_role >= target.guild.me: + raise PermError(friendly_error=_("I can't mute this user. (Discord Hierarchy applies)")) + + if target.top_role >= mod.top_role: + raise PermError(friendly_error=_("You can't mute this user. (Discord Hierarchy applies)")) diff --git a/redbot/core/utils/discord_helpers.py b/redbot/core/utils/discord_helpers.py new file mode 100644 index 000000000..639306b62 --- /dev/null +++ b/redbot/core/utils/discord_helpers.py @@ -0,0 +1,152 @@ +import discord +from typing import Dict + +__all__ = ["OverwriteDiff"] + + +class OverwriteDiff: + """ + Represents a change in PermissionOverwrites. + + All math operations done with the values contained are bitwise. + + This object is considered False for boolean logic when representing no change. + + Attributes + ---------- + allows_added : int + allows_removed : int + denies_added : int + denies_removed : int + """ + + def __init__(self, **data: int): + self.allows_added = data.pop("allows_added", 0) + self.allows_removed = data.pop("allows_removed", 0) + self.denies_added = data.pop("denies_added", 0) + self.denies_removed = data.pop("denies_removed", 0) + + if ( + (self.allows_added & self.denies_added) + or (self.allows_removed & self.denies_removed) + or (self.allows_added & self.allows_removed) + or (self.denies_added & self.denies_removed) + ): + raise ValueError( + "It is impossible for this to be the difference of two valid overwrite objects." + ) + + def __repr__(self): + return ( + f"" + ) + + def __bool__(self): + return self.allows_added or self.allows_removed or self.denies_added or self.denies_removed + + def to_dict(self) -> Dict[str, int]: + return { + "allows_added": self.allows_added, + "allows_removed": self.allows_removed, + "denies_added": self.denies_added, + "denies_removed": self.denies_removed, + } + + def __radd__(self, other: discord.PermissionOverwrite) -> discord.PermissionOverwrite: + if not isinstance(other, discord.PermissionOverwrite): + return NotImplemented + return self.apply_to_overwirte(other) + + def __rsub__(self, other: discord.PermissionOverwrite) -> discord.PermissionOverwrite: + if not isinstance(other, discord.PermissionOverwrite): + return NotImplemented + return self.remove_from_overwrite(other) + + @classmethod + def from_dict(cls, data: Dict[str, int]): + return cls(**data) + + @classmethod + def from_overwrites( + cls, before: discord.PermissionOverwrite, after: discord.PermissionOverwrite + ): + """ + Returns the difference between two permission overwrites. + + Parameters + ---------- + before : discord.PermissionOverwrite + after : discord.PermissionOverwrite + """ + + b_allow, b_deny = before.pair() + a_allow, a_deny = after.pair() + + b_allow_val, b_deny_val = b_allow.value, b_deny.value + a_allow_val, a_deny_val = a_allow.value, a_deny.value + + allows_added = a_allow_val & ~b_allow_val + allows_removed = b_allow_val & ~a_allow_val + + denies_added = a_deny_val & ~b_deny_val + denies_removed = b_deny_val & ~a_deny_val + + return cls( + allows_added=allows_added, + allows_removed=allows_removed, + denies_added=denies_added, + denies_removed=denies_removed, + ) + + def apply_to_overwirte( + self, overwrite: discord.PermissionOverwrite + ) -> discord.PermissionOverwrite: + """ + Creates a new overwrite by applying a diff to existing overwrites. + + Parameters + ---------- + overwrite : discord.PermissionOverwrite + + Returns + ------- + discord.PermissionOverwrite + A new overwrite object with the diff applied to it. + """ + + current_allow, current_deny = overwrite.pair() + + allow_value = (current_allow.value | self.allows_added) & ~self.allows_removed + deny_value = (current_deny.value | self.denies_added) & ~self.denies_removed + + na = discord.Permissions(allow_value) + nd = discord.Permissions(deny_value) + return discord.PermissionOverwrite.from_pair(na, nd) + + def remove_from_overwrite( + self, overwrite: discord.PermissionOverwrite + ) -> discord.PermissionOverwrite: + """ + If given the after for the current diff object, this should return the before. + + This can be used to roll back changes. + + Parameters + ---------- + overwrite : discord.PermissionOverwrite + + Returns + ------- + discord.PermissionOverwrite + A new overwrite object with the diff removed from it. + """ + current_allow, current_deny = overwrite.pair() + + allow_value = (current_allow.value | self.allows_removed) & ~self.allows_added + deny_value = (current_deny.value | self.denies_removed) & ~self.denies_added + + na = discord.Permissions(allow_value) + nd = discord.Permissions(deny_value) + return discord.PermissionOverwrite.from_pair(na, nd)