From 4ddd5763158c6781f19ad307af1db11902837cb8 Mon Sep 17 00:00:00 2001 From: Will Date: Sat, 14 Oct 2017 18:05:08 -0400 Subject: [PATCH] [V3 Admin] Rewrite of Squid-Plugins Admin cog (#825) * Add/remove roles * Announcement * Edit role stuff A bit of refactoring * Selfrole stuff * Add announce ignore capabilities * announce configurations Announcement fixes * Serverlock initial commit * Add some admin tests better test * Update for new config * Add user hierarchy checks * Fix tests * Update from rebase * Fix config getter * Fix async getters/selfrole --- redbot/cogs/admin/__init__.py | 5 + redbot/cogs/admin/admin.py | 389 ++++++++++++++++++++++++++++++++ redbot/cogs/admin/announcer.py | 71 ++++++ redbot/cogs/admin/converters.py | 33 +++ tests/cogs/admin/__init__.py | 0 tests/cogs/admin/test_admin.py | 57 +++++ tests/conftest.py | 7 + 7 files changed, 562 insertions(+) create mode 100644 redbot/cogs/admin/__init__.py create mode 100644 redbot/cogs/admin/admin.py create mode 100644 redbot/cogs/admin/announcer.py create mode 100644 redbot/cogs/admin/converters.py create mode 100644 tests/cogs/admin/__init__.py create mode 100644 tests/cogs/admin/test_admin.py diff --git a/redbot/cogs/admin/__init__.py b/redbot/cogs/admin/__init__.py new file mode 100644 index 000000000..8db8d7927 --- /dev/null +++ b/redbot/cogs/admin/__init__.py @@ -0,0 +1,5 @@ +from .admin import Admin + + +def setup(bot): + bot.add_cog(Admin()) diff --git a/redbot/cogs/admin/admin.py b/redbot/cogs/admin/admin.py new file mode 100644 index 000000000..182948a21 --- /dev/null +++ b/redbot/cogs/admin/admin.py @@ -0,0 +1,389 @@ +from typing import Tuple + +import discord +from discord.ext import commands + +from redbot.core import Config, checks + +import logging + +from redbot.core.utils.chat_formatting import box +from .announcer import Announcer +from .converters import MemberDefaultAuthor, SelfRole + +log = logging.getLogger("red.admin") + +GENERIC_FORBIDDEN = ( + "I attempted to do something that Discord denied me permissions for." + " Your command failed to successfully complete." +) + +HIERARCHY_ISSUE = ( + "I tried to add {role.name} to {member.display_name} but that role" + " is higher than my highest role in the Discord heirarchy so I was" + " unable to successfully add it. Please give me a higher role and " + "try again." +) + +USER_HIERARCHY_ISSUE = ( + "I tried to add {role.name} to {member.display_name} but that role" + " is higher than your highest role in the Discord heirarchy so I was" + " unable to successfully add it. Please get a higher role and " + "try again." +) + +RUNNING_ANNOUNCEMENT = ( + "I am already announcing something. If you would like to make a" + " different announcement please use `{prefix}announce cancel`" + " first." +) + + +class Admin: + def __init__(self, config=Config): + self.conf = config.get_conf(self, 8237492837454039, + force_registration=True) + + self.conf.register_global( + serverlocked=False + ) + + self.conf.register_guild( + announce_ignore=False, + announce_channel=None, # Integer ID + selfroles=[] # List of integer ID's + ) + + self.__current_announcer = None + + def __unload(self): + try: + self.__current_announcer.cancel() + except AttributeError: + pass + + @staticmethod + async def complain(ctx: commands.Context, message: str, + **kwargs): + await ctx.send(message.format(**kwargs)) + + def is_announcing(self) -> bool: + """ + Is the bot currently announcing something? + :return: + """ + if self.__current_announcer is None: + return False + + return self.__current_announcer.active or False + + @staticmethod + def pass_heirarchy_check(ctx: commands.Context, + role: discord.Role) -> bool: + """ + Determines if the bot has a higher role than the given one. + :param ctx: + :param role: Role object. + :return: + """ + return ctx.guild.me.top_role > role + + @staticmethod + def pass_user_heirarchy_check(ctx: commands.Context, + role: discord.Role) -> bool: + """ + Determines if a user is allowed to add/remove/edit the given role. + :param ctx: + :param role: + :return: + """ + return ctx.author.top_role > role + + async def _addrole(self, ctx: commands.Context, member: discord.Member, + role: discord.Role): + try: + await member.add_roles(role) + except discord.Forbidden: + if not self.pass_heirarchy_check(ctx, role): + await self.complain(ctx, HIERARCHY_ISSUE, role=role, + member=member) + else: + await self.complain(ctx, GENERIC_FORBIDDEN) + else: + await ctx.send("I successfully added {role.name} to" + " {member.display_name}".format( + role=role, member=member + )) + + async def _removerole(self, ctx: commands.Context, member: discord.Member, + role: discord.Role): + try: + await member.remove_roles(role) + except discord.Forbidden: + if not self.pass_heirarchy_check(ctx, role): + await self.complain(ctx, HIERARCHY_ISSUE, role=role, + member=member) + else: + await self.complain(ctx, GENERIC_FORBIDDEN) + else: + await ctx.send("I successfully removed {role.name} from" + " {member.display_name}".format( + role=role, member=member + )) + + @commands.command() + @commands.guild_only() + @checks.admin_or_permissions(manage_roles=True) + async def addrole(self, ctx: commands.Context, rolename: discord.Role, + user: MemberDefaultAuthor=None): + """ + Adds a role to a user. If user is left blank it defaults to the + author of the command. + """ + if self.pass_user_heirarchy_check(ctx, rolename): + # noinspection PyTypeChecker + await self._addrole(ctx, user, rolename) + else: + await self.complain(ctx, USER_HIERARCHY_ISSUE) + + @commands.command() + @commands.guild_only() + @checks.admin_or_permissions(manage_roles=True) + async def removerole(self, ctx: commands.Context, rolename: discord.Role, + user: MemberDefaultAuthor=None): + """ + Removes a role from a user. If user is left blank it defaults to the + author of the command. + """ + if self.pass_user_heirarchy_check(ctx, rolename): + # noinspection PyTypeChecker + await self._removerole(ctx, user, rolename) + else: + await self.complain(ctx, USER_HIERARCHY_ISSUE) + + @commands.group() + @commands.guild_only() + @checks.admin_or_permissions(manage_roles=True) + async def editrole(self, ctx: commands.Context): + """Edits roles settings""" + if ctx.invoked_subcommand is None: + await ctx.bot.send_cmd_help(ctx) + + @editrole.command(name="colour", aliases=["color", ]) + async def editrole_colour(self, ctx: commands.Context, role: discord.Role, + value: discord.Colour): + """Edits a role's colour + + Use double quotes if the role contains spaces. + Colour must be in hexadecimal format. + \"http://www.w3schools.com/colors/colors_picker.asp\" + Examples: + !editrole colour \"The Transistor\" #ff0000 + !editrole colour Test #ff9900""" + author = ctx.author + reason = "{}({}) changed the colour of role '{}'".format( + author.name, author.id, role.name) + + if not self.pass_user_heirarchy_check(ctx, role): + await self.complain(ctx, USER_HIERARCHY_ISSUE) + return + + try: + await role.edit(reason=reason, color=value) + except discord.Forbidden: + await self.complain(ctx, GENERIC_FORBIDDEN) + else: + log.info(reason) + await ctx.send("Done.") + + @editrole.command(name="name") + @checks.admin_or_permissions(administrator=True) + async def edit_role_name(self, ctx: commands.Context, role: discord.Role, *, name: str): + """Edits a role's name + + Use double quotes if the role or the name contain spaces. + Examples: + !editrole name \"The Transistor\" Test""" + author = ctx.message.author + old_name = role.name + reason = "{}({}) changed the name of role '{}' to '{}'".format( + author.name, author.id, old_name, name) + + if not self.pass_user_heirarchy_check(ctx, role): + await self.complain(ctx, USER_HIERARCHY_ISSUE) + return + + try: + await role.edit(reason=reason, name=name) + except discord.Forbidden: + await self.complain(ctx, GENERIC_FORBIDDEN) + else: + log.info(reason) + await ctx.send("Done.") + + @commands.group(invoke_without_command=True) + @checks.is_owner() + async def announce(self, ctx: commands.Context, message: str): + """ + Announces a message to all servers the bot is in. + """ + if not self.is_announcing(): + announcer = Announcer(ctx, message, config=self.conf) + announcer.start() + + self.__current_announcer = announcer + + await ctx.send("The announcement has begun.") + else: + prefix = ctx.prefix + await self.complain(ctx, RUNNING_ANNOUNCEMENT, + prefix=prefix) + + @announce.command(name="cancel") + @checks.is_owner() + async def announce_cancel(self, ctx): + """ + Cancels a running announce. + """ + try: + self.__current_announcer.cancel() + except AttributeError: + pass + + await ctx.send("The current announcement has been cancelled.") + + @announce.command(name="channel") + @commands.guild_only() + @checks.guildowner_or_permissions(administrator=True) + async def announce_channel(self, ctx, channel: discord.TextChannel=None): + """ + Changes the channel on which the bot makes announcements. + """ + if channel is None: + channel = ctx.channel + await self.conf.guild(ctx.guild).set("announce_channel", channel.id) + + await ctx.send("The announcement channel has been set to {}".format( + channel.mention + )) + + @announce.command(name="ignore") + @commands.guild_only() + @checks.guildowner_or_permissions(administrator=True) + async def announce_ignore(self, ctx, guild: discord.Guild=None): + """ + Toggles whether the announcements will ignore the given server. + Defaults to the current server if none is provided. + """ + if guild is None: + guild = ctx.guild + + ignored = await self.conf.guild(guild).announce_ignore() + await self.conf.guild(guild).announce_ignore.set(not ignored) + + verb = "will" if ignored else "will not" + + await ctx.send("The server {} {} receive announcements.".format( + guild.name, verb + )) + + async def _valid_selfroles(self, guild: discord.Guild) -> Tuple[discord.Role]: + """ + Returns a list of valid selfroles + :param guild: + :return: + """ + selfrole_ids = set(await self.conf.guild(guild).selfroles()) + guild_roles = guild.roles + + valid_roles = tuple(r for r in guild_roles if r.id in selfrole_ids) + valid_role_ids = set(r.id for r in valid_roles) + + if selfrole_ids != valid_role_ids: + await self.conf.guild(guild).selfroles.set(valid_role_ids) + + # noinspection PyTypeChecker + return valid_roles + + @commands.group(invoke_without_command=True) + async def selfrole(self, ctx: commands.Context, selfrole: SelfRole): + """ + Add a role to yourself that server admins have configured as + user settable. + """ + # noinspection PyTypeChecker + await self._addrole(ctx, ctx.author, selfrole) + + @selfrole.command(name="remove") + async def selfrole_remove(self, ctx: commands.Context, selfrole: SelfRole): + """ + Removes a selfrole from yourself. + """ + # noinspection PyTypeChecker + await self._removerole(ctx, ctx.author, selfrole) + + @selfrole.command(name="add") + @commands.has_permissions(manage_roles=True) + async def selfrole_add(self, ctx: commands.Context, role: discord.Role): + """ + Add a role to the list of available selfroles. + """ + curr_selfroles = await self.conf.guild(ctx.guild).selfroles() + if role.id not in curr_selfroles: + curr_selfroles.append(role.id) + await self.conf.guild(ctx.guild).selfroles.set(curr_selfroles) + + await ctx.send("The selfroles list has been successfully modified.") + + @selfrole.command(name="delete") + @commands.has_permissions(manage_roles=True) + async def selfrole_delete(self, ctx: commands.Context, role: SelfRole): + """ + Removes a role from the list of available selfroles. + """ + curr_selfroles = await self.conf.guild(ctx.guild).selfroles() + curr_selfroles.remove(role.id) + await self.conf.guild(ctx.guild).selfroles.set(curr_selfroles) + + await ctx.send("The selfroles list has been successfully modified.") + + @selfrole.command(name="list") + async def selfrole_list(self, ctx: commands.Context): + """ + Lists all available selfroles. + """ + selfroles = await self._valid_selfroles(ctx.guild) + fmt_selfroles = "\n".join(["+ " + r.name for r in selfroles]) + + msg = "Available Selfroles:\n{}".format(fmt_selfroles) + await ctx.send(box(msg, "diff")) + + async def _serverlock_check(self, guild: discord.Guild) -> bool: + """ + Checks if serverlocked is enabled. + :param guild: + :return: True if locked and left server + """ + if await self.conf.serverlocked(): + await guild.leave() + return True + return False + + @commands.command() + @checks.is_owner() + async def serverlock(self, ctx: commands.Context): + """ + Locks a bot to it's current servers only. + """ + serverlocked = await self.conf.serverlocked() + await self.conf.serverlocked.set(not serverlocked) + + verb = "is now" if not serverlocked else "is no longer" + + await ctx.send("The bot {} serverlocked.".format(verb)) + +# region Event Handlers + async def on_guild_join(self, guild: discord.Guild): + if await self._serverlock_check(guild): + return +# endregion diff --git a/redbot/cogs/admin/announcer.py b/redbot/cogs/admin/announcer.py new file mode 100644 index 000000000..377ddb222 --- /dev/null +++ b/redbot/cogs/admin/announcer.py @@ -0,0 +1,71 @@ +import asyncio + +import discord +from discord.ext import commands + + +class Announcer: + def __init__(self, ctx: commands.Context, + message: str, + config=None): + """ + :param ctx: + :param message: + :param config: Used to determine channel overrides + """ + self.ctx = ctx + self.message = message + self.config = config + + self.active = None + + def start(self): + """ + Starts an announcement. + :return: + """ + if self.active is None: + self.active = True + self.ctx.bot.loop.create_task(self.announcer()) + + def cancel(self): + """ + Cancels a running announcement. + :return: + """ + self.active = False + + async def _get_announce_channel(self, guild: discord.Guild) -> discord.TextChannel: + channel_id = await self.config.guild(guild).announce_channel() + channel = None + + if channel_id is not None: + channel = guild.get_channel(channel_id) + + if channel is None: + channel = guild.default_channel + + return channel + + async def announcer(self): + guild_list = self.ctx.bot.guilds + bot_owner = (await self.ctx.bot.application_info()).owner + for g in guild_list: + if not self.active: + return + + if await self.config.guild(g).announce_ignore(): + continue + + channel = await self._get_announce_channel(g) + + try: + await channel.send(self.message) + except discord.Forbidden: + await bot_owner.send("I could not announce to guild: {}".format( + g.id + )) + await asyncio.sleep(0.5) + + self.active = False + diff --git a/redbot/cogs/admin/converters.py b/redbot/cogs/admin/converters.py new file mode 100644 index 000000000..ca12b9734 --- /dev/null +++ b/redbot/cogs/admin/converters.py @@ -0,0 +1,33 @@ +import discord +from discord.ext import commands + + +class MemberDefaultAuthor(commands.Converter): + async def convert(self, ctx: commands.Context, arg: str) -> discord.Member: + member_converter = commands.MemberConverter() + try: + member = await member_converter.convert(ctx, arg) + except commands.BadArgument: + if arg.strip() != "": + raise + else: + member = ctx.author + return member + + +class SelfRole(commands.Converter): + async def convert(self, ctx: commands.Context, arg: str) -> discord.Role: + admin = ctx.command.instance + if admin is None: + raise commands.BadArgument("Admin is not loaded.") + + conf = admin.conf + selfroles = await conf.guild(ctx.guild).selfroles() + + role_converter = commands.RoleConverter() + role = await role_converter.convert(ctx, arg) + + if role.id not in selfroles: + raise commands.BadArgument("The provided role is not a valid" + " selfrole.") + return role diff --git a/tests/cogs/admin/__init__.py b/tests/cogs/admin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cogs/admin/test_admin.py b/tests/cogs/admin/test_admin.py new file mode 100644 index 000000000..e1f5ed2e8 --- /dev/null +++ b/tests/cogs/admin/test_admin.py @@ -0,0 +1,57 @@ +from unittest.mock import MagicMock + +import pytest + +from redbot.cogs.admin import Admin +from redbot.cogs.admin.announcer import Announcer + + +@pytest.fixture() +def admin(config): + return Admin(config) + + +@pytest.fixture() +def announcer(admin): + a = Announcer(MagicMock(), "Some message", admin.conf) + yield a + a.cancel() + + +@pytest.mark.asyncio +async def test_serverlock_check(admin, coroutine): + await admin.conf.serverlocked.set(True) + guild = MagicMock() + guild.leave = coroutine + + # noinspection PyProtectedMember + ret = await admin._serverlock_check(guild) + + assert ret is True + + +def test_announcer_initial_state(announcer): + assert announcer.active is None + + +def test_announcer_start(announcer): + announcer.announcer = object + announcer.start() + + assert announcer.ctx.bot.loop.create_task.called + assert announcer.active is True + + +@pytest.mark.asyncio +async def test_announcer_ignore(announcer, empty_guild, empty_channel): + await announcer.config.guild(empty_guild).announce_channel.set(empty_channel.id) + + guild = MagicMock() + guild.id = empty_guild.id + + guild.get_channel.return_value = empty_channel + + ret = await announcer._get_announce_channel(guild) + + assert guild.get_channel.called + assert ret == empty_channel diff --git a/tests/conftest.py b/tests/conftest.py index e5ae4557c..8e55c90c7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,6 +24,13 @@ def override_data_path(tmpdir): data_manager.basic_config['DATA_PATH'] = str(tmpdir) +@pytest.fixture() +def coroutine(): + async def some_coro(*args, **kwargs): + return args, kwargs + return some_coro + + @pytest.fixture() def json_driver(tmpdir_factory): import uuid