diff --git a/docs/cog_customcom.rst b/docs/cog_customcom.rst index d977ff5d2..83cd3eda6 100644 --- a/docs/cog_customcom.rst +++ b/docs/cog_customcom.rst @@ -12,6 +12,14 @@ CustomCommands allows you to create simple commands for your bot without requiri If the command you attempt to create shares a name with an already loaded command, you cannot overwrite it with this cog. +--------- +Cooldowns +--------- + +You can set cooldowns for your custom commands. If a command is on cooldown, it will not be triggered. + +You can set cooldowns per member or per channel, or set a cooldown guild-wide. You can also set multiple types of cooldown on a single custom command. All cooldowns must pass before the command will trigger. + ------------------ Context Parameters ------------------ diff --git a/redbot/cogs/customcom/customcom.py b/redbot/cogs/customcom/customcom.py index bf4f1940c..f2f903291 100644 --- a/redbot/cogs/customcom/customcom.py +++ b/redbot/cogs/customcom/customcom.py @@ -1,7 +1,7 @@ import os import re import random -from datetime import datetime +from datetime import datetime, timedelta from inspect import Parameter from collections import OrderedDict from typing import Mapping @@ -19,10 +19,6 @@ class CCError(Exception): pass -class NotFound(CCError): - pass - - class AlreadyExists(CCError): pass @@ -31,6 +27,14 @@ class ArgParseError(CCError): pass +class NotFound(CCError): + pass + + +class OnCooldown(CCError): + pass + + class CommandObj: def __init__(self, **kwargs): config = kwargs.get("config") @@ -88,9 +92,9 @@ class CommandObj: if not ccinfo: raise NotFound() else: - return ccinfo["response"] + return ccinfo["response"], ccinfo.get("cooldowns", {}) - async def create(self, ctx: commands.Context, command: str, response): + async def create(self, ctx: commands.Context, command: str, *, response): """Create a custom command""" # Check if this command is already registered as a customcommand if await self.db(ctx.guild).commands.get_raw(command, default=None): @@ -101,25 +105,35 @@ class CommandObj: ccinfo = { "author": {"id": author.id, "name": author.name}, "command": command, + "cooldowns": {}, "created_at": self.get_now(), "editors": [], "response": response, } await self.db(ctx.guild).commands.set_raw(command, value=ccinfo) - async def edit(self, ctx: commands.Context, command: str, response: None): + async def edit( + self, + ctx: commands.Context, + command: str, + *, + response=None, + cooldowns: Mapping[str, int] = None, + ask_for: bool = True + ): """Edit an already existing custom command""" + ccinfo = await self.db(ctx.guild).commands.get_raw(command, default=None) + # Check if this command is registered - if not await self.db(ctx.guild).commands.get_raw(command, default=None): + if not ccinfo: raise NotFound() author = ctx.message.author - ccinfo = await self.db(ctx.guild).commands.get_raw(command, default=None) def check(m): - return m.channel == ctx.channel and m.author == ctx.message.author + return m.channel == ctx.channel and m.author == ctx.author - if not response: + if ask_for and not response: await ctx.send(_("Do you want to create a 'randomized' cc? {}").format("y/n")) msg = await self.bot.wait_for("message", check=check) @@ -129,17 +143,24 @@ class CommandObj: await ctx.send(_("What response do you want?")) response = (await self.bot.wait_for("message", check=check)).content - # test to raise - ctx.cog.prepare_args(response if isinstance(response, str) else response[0]) + if response: + # test to raise + ctx.cog.prepare_args(response if isinstance(response, str) else response[0]) + ccinfo["response"] = response - ccinfo["response"] = response - ccinfo["edited_at"] = self.get_now() + if cooldowns: + ccinfo.setdefault("cooldowns", {}).update(cooldowns) + for key, value in ccinfo["cooldowns"].copy().items(): + if value <= 0: + del ccinfo["cooldowns"][key] if author.id not in ccinfo["editors"]: # Add the person who invoked the `edit` coroutine to the list of # editors, if the person is not yet in there ccinfo["editors"].append(author.id) + ccinfo["edited_at"] = self.get_now() + await self.db(ctx.guild).commands.set_raw(command, value=ccinfo) async def delete(self, ctx: commands.Context, command: str): @@ -162,6 +183,7 @@ class CustomCommands: self.config = Config.get_conf(self, self.key) self.config.register_guild(commands={}) self.commandobj = CommandObj(config=self.config, bot=self.bot) + self.cooldowns = {} @commands.group(aliases=["cc"]) @commands.guild_only() @@ -182,7 +204,7 @@ class CustomCommands: @cc_add.command(name="random") @checks.mod_or_permissions(administrator=True) - async def cc_add_random(self, ctx: commands.Context, command: str): + async def cc_add_random(self, ctx: commands.Context, command: str.lower): """ Create a CC where it will randomly choose a response! @@ -205,13 +227,12 @@ class CustomCommands: @cc_add.command(name="simple") @checks.mod_or_permissions(administrator=True) - async def cc_add_simple(self, ctx, command: str, *, text): + async def cc_add_simple(self, ctx, command: str.lower, *, text: str): """Adds a simple custom command Example: [p]customcom add simple yourcommand Text you want """ - command = command.lower() if command in self.bot.all_commands: await ctx.send(_("That command is already a standard command.")) return @@ -227,16 +248,69 @@ class CustomCommands: except ArgParseError as e: await ctx.send(e.args[0]) + @customcom.command(name="cooldown") + @checks.mod_or_permissions(administrator=True) + async def cc_cooldown( + self, ctx, command: str.lower, cooldown: int = None, *, per: str.lower = "member" + ): + """ + Sets, edits, or views cooldowns for a custom command + + You may set cooldowns per member, channel, or guild. + Multiple cooldowns may be set. All cooldowns must be cooled to call the custom command. + Example: + [p]customcom cooldown yourcommand 30 + """ + if cooldown is None: + try: + cooldowns = (await self.commandobj.get(ctx.message, command))[1] + except NotFound: + return await ctx.send(_("That command doesn't exist.")) + if cooldowns: + cooldown = [] + for per, rate in cooldowns.items(): + cooldown.append( + _("A {} may call this command every {} seconds").format(per, rate) + ) + return await ctx.send("\n".join(cooldown)) + else: + return await ctx.send(_("This command has no cooldown.")) + per = {"server": "guild", "user": "member"}.get(per, per) + allowed = ("guild", "member", "channel") + if per not in allowed: + return await ctx.send(_("{} must be one of {}").format("per", ", ".join(allowed))) + cooldown = {per: cooldown} + try: + await self.commandobj.edit(ctx=ctx, command=command, cooldowns=cooldown, ask_for=False) + await ctx.send(_("Custom command cooldown successfully edited.")) + except NotFound: + await ctx.send( + _("That command doesn't exist. Use `{}` to add it.").format( + "{}customcom add".format(ctx.prefix) + ) + ) + + @customcom.command(name="delete") + @checks.mod_or_permissions(administrator=True) + async def cc_delete(self, ctx, command: str.lower): + """Deletes a custom command + + Example: + [p]customcom delete yourcommand""" + try: + await self.commandobj.delete(ctx=ctx, command=command) + await ctx.send(_("Custom command successfully deleted.")) + except NotFound: + await ctx.send(_("That command doesn't exist.")) + @customcom.command(name="edit") @checks.mod_or_permissions(administrator=True) - async def cc_edit(self, ctx, command: str, *, text=None): - """Edits a custom command + async def cc_edit(self, ctx, command: str.lower, *, text: str = None): + """Edits a custom command's response Example: [p]customcom edit yourcommand Text you want """ - command = command.lower() - try: await self.commandobj.edit(ctx=ctx, command=command, response=text) await ctx.send(_("Custom command successfully edited.")) @@ -249,19 +323,6 @@ class CustomCommands: except ArgParseError as e: await ctx.send(e.args[0]) - @customcom.command(name="delete") - @checks.mod_or_permissions(administrator=True) - async def cc_delete(self, ctx, command: str): - """Deletes a custom command - Example: - [p]customcom delete yourcommand""" - command = command.lower() - try: - await self.commandobj.delete(ctx=ctx, command=command) - await ctx.send(_("Custom command successfully deleted.")) - except NotFound: - await ctx.send(_("That command doesn't exist.")) - @customcom.command(name="list") async def cc_list(self, ctx): """Shows custom commands list""" @@ -302,8 +363,8 @@ class CustomCommands: # user_allowed check, will be replaced with self.bot.user_allowed or # something similar once it's added - user_allowed = True + if len(message.content) < 2 or is_private or not user_allowed or message.author.bot: return @@ -313,22 +374,25 @@ class CustomCommands: return try: - raw_response = await self.commandobj.get(message=message, command=ctx.invoked_with) + raw_response, cooldowns = await self.commandobj.get( + message=message, command=ctx.invoked_with + ) if isinstance(raw_response, list): raw_response = random.choice(raw_response) elif isinstance(raw_response, str): pass else: raise NotFound() - except NotFound: + if cooldowns: + self.test_cooldowns(ctx, ctx.invoked_with, cooldowns) + except CCError: return - await self.call_cc_command(ctx, raw_response, message) - async def call_cc_command(self, ctx, raw_response, message) -> None: # wrap the command here so it won't register with the bot fake_cc = commands.Command(ctx.invoked_with, self.cc_callback) fake_cc.params = self.prepare_args(raw_response) ctx.command = fake_cc + await self.bot.invoke(ctx) if not ctx.command_failed: await self.cc_command(*ctx.args, **ctx.kwargs, raw_response=raw_response) @@ -429,6 +493,25 @@ class CustomCommands: fin = default + [(p.name, p) for p in fin] return OrderedDict(fin) + def test_cooldowns(self, ctx, command, cooldowns): + now = datetime.utcnow() + new_cooldowns = {} + for per, rate in cooldowns.items(): + if per == "guild": + key = (command, ctx.guild) + elif per == "channel": + key = (command, ctx.guild, ctx.channel) + elif per == "member": + key = (command, ctx.guild, ctx.author) + cooldown = self.cooldowns.get(key) + if cooldown: + cooldown += timedelta(seconds=rate) + if cooldown > now: + raise OnCooldown() + new_cooldowns[key] = now + # only update cooldowns if the command isn't on cooldown + self.cooldowns.update(new_cooldowns) + def transform_arg(self, result, attr, obj) -> str: attr = attr[1:] # strip initial dot if not attr: