mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-07 11:48:55 -05:00
[V3 CustomCommands] Cooldowns (#2124)
* customcom cooldowns allows you to set multiple different cooldowns for custom commands * black formatting * [docs] cooldowns
This commit is contained in:
parent
113b97b9c9
commit
61652a0306
@ -12,6 +12,14 @@ CustomCommands allows you to create simple commands for your bot without requiri
|
|||||||
|
|
||||||
If the command you attempt to create shares a name with an already loaded command, you cannot overwrite it with this cog.
|
If the command you attempt to create shares a name with an already loaded command, you cannot overwrite it with this cog.
|
||||||
|
|
||||||
|
---------
|
||||||
|
Cooldowns
|
||||||
|
---------
|
||||||
|
|
||||||
|
You can set cooldowns for your custom commands. If a command is on cooldown, it will not be triggered.
|
||||||
|
|
||||||
|
You can set cooldowns per member or per channel, or set a cooldown guild-wide. You can also set multiple types of cooldown on a single custom command. All cooldowns must pass before the command will trigger.
|
||||||
|
|
||||||
------------------
|
------------------
|
||||||
Context Parameters
|
Context Parameters
|
||||||
------------------
|
------------------
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import random
|
import random
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from inspect import Parameter
|
from inspect import Parameter
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from typing import Mapping
|
from typing import Mapping
|
||||||
@ -19,10 +19,6 @@ class CCError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class NotFound(CCError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class AlreadyExists(CCError):
|
class AlreadyExists(CCError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -31,6 +27,14 @@ class ArgParseError(CCError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NotFound(CCError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class OnCooldown(CCError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class CommandObj:
|
class CommandObj:
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
config = kwargs.get("config")
|
config = kwargs.get("config")
|
||||||
@ -88,9 +92,9 @@ class CommandObj:
|
|||||||
if not ccinfo:
|
if not ccinfo:
|
||||||
raise NotFound()
|
raise NotFound()
|
||||||
else:
|
else:
|
||||||
return ccinfo["response"]
|
return ccinfo["response"], ccinfo.get("cooldowns", {})
|
||||||
|
|
||||||
async def create(self, ctx: commands.Context, command: str, response):
|
async def create(self, ctx: commands.Context, command: str, *, response):
|
||||||
"""Create a custom command"""
|
"""Create a custom command"""
|
||||||
# Check if this command is already registered as a customcommand
|
# Check if this command is already registered as a customcommand
|
||||||
if await self.db(ctx.guild).commands.get_raw(command, default=None):
|
if await self.db(ctx.guild).commands.get_raw(command, default=None):
|
||||||
@ -101,25 +105,35 @@ class CommandObj:
|
|||||||
ccinfo = {
|
ccinfo = {
|
||||||
"author": {"id": author.id, "name": author.name},
|
"author": {"id": author.id, "name": author.name},
|
||||||
"command": command,
|
"command": command,
|
||||||
|
"cooldowns": {},
|
||||||
"created_at": self.get_now(),
|
"created_at": self.get_now(),
|
||||||
"editors": [],
|
"editors": [],
|
||||||
"response": response,
|
"response": response,
|
||||||
}
|
}
|
||||||
await self.db(ctx.guild).commands.set_raw(command, value=ccinfo)
|
await self.db(ctx.guild).commands.set_raw(command, value=ccinfo)
|
||||||
|
|
||||||
async def edit(self, ctx: commands.Context, command: str, response: None):
|
async def edit(
|
||||||
|
self,
|
||||||
|
ctx: commands.Context,
|
||||||
|
command: str,
|
||||||
|
*,
|
||||||
|
response=None,
|
||||||
|
cooldowns: Mapping[str, int] = None,
|
||||||
|
ask_for: bool = True
|
||||||
|
):
|
||||||
"""Edit an already existing custom command"""
|
"""Edit an already existing custom command"""
|
||||||
|
ccinfo = await self.db(ctx.guild).commands.get_raw(command, default=None)
|
||||||
|
|
||||||
# Check if this command is registered
|
# Check if this command is registered
|
||||||
if not await self.db(ctx.guild).commands.get_raw(command, default=None):
|
if not ccinfo:
|
||||||
raise NotFound()
|
raise NotFound()
|
||||||
|
|
||||||
author = ctx.message.author
|
author = ctx.message.author
|
||||||
ccinfo = await self.db(ctx.guild).commands.get_raw(command, default=None)
|
|
||||||
|
|
||||||
def check(m):
|
def check(m):
|
||||||
return m.channel == ctx.channel and m.author == ctx.message.author
|
return m.channel == ctx.channel and m.author == ctx.author
|
||||||
|
|
||||||
if not response:
|
if ask_for and not response:
|
||||||
await ctx.send(_("Do you want to create a 'randomized' cc? {}").format("y/n"))
|
await ctx.send(_("Do you want to create a 'randomized' cc? {}").format("y/n"))
|
||||||
|
|
||||||
msg = await self.bot.wait_for("message", check=check)
|
msg = await self.bot.wait_for("message", check=check)
|
||||||
@ -129,17 +143,24 @@ class CommandObj:
|
|||||||
await ctx.send(_("What response do you want?"))
|
await ctx.send(_("What response do you want?"))
|
||||||
response = (await self.bot.wait_for("message", check=check)).content
|
response = (await self.bot.wait_for("message", check=check)).content
|
||||||
|
|
||||||
|
if response:
|
||||||
# test to raise
|
# test to raise
|
||||||
ctx.cog.prepare_args(response if isinstance(response, str) else response[0])
|
ctx.cog.prepare_args(response if isinstance(response, str) else response[0])
|
||||||
|
|
||||||
ccinfo["response"] = response
|
ccinfo["response"] = response
|
||||||
ccinfo["edited_at"] = self.get_now()
|
|
||||||
|
if cooldowns:
|
||||||
|
ccinfo.setdefault("cooldowns", {}).update(cooldowns)
|
||||||
|
for key, value in ccinfo["cooldowns"].copy().items():
|
||||||
|
if value <= 0:
|
||||||
|
del ccinfo["cooldowns"][key]
|
||||||
|
|
||||||
if author.id not in ccinfo["editors"]:
|
if author.id not in ccinfo["editors"]:
|
||||||
# Add the person who invoked the `edit` coroutine to the list of
|
# Add the person who invoked the `edit` coroutine to the list of
|
||||||
# editors, if the person is not yet in there
|
# editors, if the person is not yet in there
|
||||||
ccinfo["editors"].append(author.id)
|
ccinfo["editors"].append(author.id)
|
||||||
|
|
||||||
|
ccinfo["edited_at"] = self.get_now()
|
||||||
|
|
||||||
await self.db(ctx.guild).commands.set_raw(command, value=ccinfo)
|
await self.db(ctx.guild).commands.set_raw(command, value=ccinfo)
|
||||||
|
|
||||||
async def delete(self, ctx: commands.Context, command: str):
|
async def delete(self, ctx: commands.Context, command: str):
|
||||||
@ -162,6 +183,7 @@ class CustomCommands:
|
|||||||
self.config = Config.get_conf(self, self.key)
|
self.config = Config.get_conf(self, self.key)
|
||||||
self.config.register_guild(commands={})
|
self.config.register_guild(commands={})
|
||||||
self.commandobj = CommandObj(config=self.config, bot=self.bot)
|
self.commandobj = CommandObj(config=self.config, bot=self.bot)
|
||||||
|
self.cooldowns = {}
|
||||||
|
|
||||||
@commands.group(aliases=["cc"])
|
@commands.group(aliases=["cc"])
|
||||||
@commands.guild_only()
|
@commands.guild_only()
|
||||||
@ -182,7 +204,7 @@ class CustomCommands:
|
|||||||
|
|
||||||
@cc_add.command(name="random")
|
@cc_add.command(name="random")
|
||||||
@checks.mod_or_permissions(administrator=True)
|
@checks.mod_or_permissions(administrator=True)
|
||||||
async def cc_add_random(self, ctx: commands.Context, command: str):
|
async def cc_add_random(self, ctx: commands.Context, command: str.lower):
|
||||||
"""
|
"""
|
||||||
Create a CC where it will randomly choose a response!
|
Create a CC where it will randomly choose a response!
|
||||||
|
|
||||||
@ -205,13 +227,12 @@ class CustomCommands:
|
|||||||
|
|
||||||
@cc_add.command(name="simple")
|
@cc_add.command(name="simple")
|
||||||
@checks.mod_or_permissions(administrator=True)
|
@checks.mod_or_permissions(administrator=True)
|
||||||
async def cc_add_simple(self, ctx, command: str, *, text):
|
async def cc_add_simple(self, ctx, command: str.lower, *, text: str):
|
||||||
"""Adds a simple custom command
|
"""Adds a simple custom command
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
[p]customcom add simple yourcommand Text you want
|
[p]customcom add simple yourcommand Text you want
|
||||||
"""
|
"""
|
||||||
command = command.lower()
|
|
||||||
if command in self.bot.all_commands:
|
if command in self.bot.all_commands:
|
||||||
await ctx.send(_("That command is already a standard command."))
|
await ctx.send(_("That command is already a standard command."))
|
||||||
return
|
return
|
||||||
@ -227,16 +248,69 @@ class CustomCommands:
|
|||||||
except ArgParseError as e:
|
except ArgParseError as e:
|
||||||
await ctx.send(e.args[0])
|
await ctx.send(e.args[0])
|
||||||
|
|
||||||
|
@customcom.command(name="cooldown")
|
||||||
|
@checks.mod_or_permissions(administrator=True)
|
||||||
|
async def cc_cooldown(
|
||||||
|
self, ctx, command: str.lower, cooldown: int = None, *, per: str.lower = "member"
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Sets, edits, or views cooldowns for a custom command
|
||||||
|
|
||||||
|
You may set cooldowns per member, channel, or guild.
|
||||||
|
Multiple cooldowns may be set. All cooldowns must be cooled to call the custom command.
|
||||||
|
Example:
|
||||||
|
[p]customcom cooldown yourcommand 30
|
||||||
|
"""
|
||||||
|
if cooldown is None:
|
||||||
|
try:
|
||||||
|
cooldowns = (await self.commandobj.get(ctx.message, command))[1]
|
||||||
|
except NotFound:
|
||||||
|
return await ctx.send(_("That command doesn't exist."))
|
||||||
|
if cooldowns:
|
||||||
|
cooldown = []
|
||||||
|
for per, rate in cooldowns.items():
|
||||||
|
cooldown.append(
|
||||||
|
_("A {} may call this command every {} seconds").format(per, rate)
|
||||||
|
)
|
||||||
|
return await ctx.send("\n".join(cooldown))
|
||||||
|
else:
|
||||||
|
return await ctx.send(_("This command has no cooldown."))
|
||||||
|
per = {"server": "guild", "user": "member"}.get(per, per)
|
||||||
|
allowed = ("guild", "member", "channel")
|
||||||
|
if per not in allowed:
|
||||||
|
return await ctx.send(_("{} must be one of {}").format("per", ", ".join(allowed)))
|
||||||
|
cooldown = {per: cooldown}
|
||||||
|
try:
|
||||||
|
await self.commandobj.edit(ctx=ctx, command=command, cooldowns=cooldown, ask_for=False)
|
||||||
|
await ctx.send(_("Custom command cooldown successfully edited."))
|
||||||
|
except NotFound:
|
||||||
|
await ctx.send(
|
||||||
|
_("That command doesn't exist. Use `{}` to add it.").format(
|
||||||
|
"{}customcom add".format(ctx.prefix)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
@customcom.command(name="delete")
|
||||||
|
@checks.mod_or_permissions(administrator=True)
|
||||||
|
async def cc_delete(self, ctx, command: str.lower):
|
||||||
|
"""Deletes a custom command
|
||||||
|
|
||||||
|
Example:
|
||||||
|
[p]customcom delete yourcommand"""
|
||||||
|
try:
|
||||||
|
await self.commandobj.delete(ctx=ctx, command=command)
|
||||||
|
await ctx.send(_("Custom command successfully deleted."))
|
||||||
|
except NotFound:
|
||||||
|
await ctx.send(_("That command doesn't exist."))
|
||||||
|
|
||||||
@customcom.command(name="edit")
|
@customcom.command(name="edit")
|
||||||
@checks.mod_or_permissions(administrator=True)
|
@checks.mod_or_permissions(administrator=True)
|
||||||
async def cc_edit(self, ctx, command: str, *, text=None):
|
async def cc_edit(self, ctx, command: str.lower, *, text: str = None):
|
||||||
"""Edits a custom command
|
"""Edits a custom command's response
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
[p]customcom edit yourcommand Text you want
|
[p]customcom edit yourcommand Text you want
|
||||||
"""
|
"""
|
||||||
command = command.lower()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.commandobj.edit(ctx=ctx, command=command, response=text)
|
await self.commandobj.edit(ctx=ctx, command=command, response=text)
|
||||||
await ctx.send(_("Custom command successfully edited."))
|
await ctx.send(_("Custom command successfully edited."))
|
||||||
@ -249,19 +323,6 @@ class CustomCommands:
|
|||||||
except ArgParseError as e:
|
except ArgParseError as e:
|
||||||
await ctx.send(e.args[0])
|
await ctx.send(e.args[0])
|
||||||
|
|
||||||
@customcom.command(name="delete")
|
|
||||||
@checks.mod_or_permissions(administrator=True)
|
|
||||||
async def cc_delete(self, ctx, command: str):
|
|
||||||
"""Deletes a custom command
|
|
||||||
Example:
|
|
||||||
[p]customcom delete yourcommand"""
|
|
||||||
command = command.lower()
|
|
||||||
try:
|
|
||||||
await self.commandobj.delete(ctx=ctx, command=command)
|
|
||||||
await ctx.send(_("Custom command successfully deleted."))
|
|
||||||
except NotFound:
|
|
||||||
await ctx.send(_("That command doesn't exist."))
|
|
||||||
|
|
||||||
@customcom.command(name="list")
|
@customcom.command(name="list")
|
||||||
async def cc_list(self, ctx):
|
async def cc_list(self, ctx):
|
||||||
"""Shows custom commands list"""
|
"""Shows custom commands list"""
|
||||||
@ -302,8 +363,8 @@ class CustomCommands:
|
|||||||
|
|
||||||
# user_allowed check, will be replaced with self.bot.user_allowed or
|
# user_allowed check, will be replaced with self.bot.user_allowed or
|
||||||
# something similar once it's added
|
# something similar once it's added
|
||||||
|
|
||||||
user_allowed = True
|
user_allowed = True
|
||||||
|
|
||||||
if len(message.content) < 2 or is_private or not user_allowed or message.author.bot:
|
if len(message.content) < 2 or is_private or not user_allowed or message.author.bot:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -313,22 +374,25 @@ class CustomCommands:
|
|||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
raw_response = await self.commandobj.get(message=message, command=ctx.invoked_with)
|
raw_response, cooldowns = await self.commandobj.get(
|
||||||
|
message=message, command=ctx.invoked_with
|
||||||
|
)
|
||||||
if isinstance(raw_response, list):
|
if isinstance(raw_response, list):
|
||||||
raw_response = random.choice(raw_response)
|
raw_response = random.choice(raw_response)
|
||||||
elif isinstance(raw_response, str):
|
elif isinstance(raw_response, str):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
raise NotFound()
|
raise NotFound()
|
||||||
except NotFound:
|
if cooldowns:
|
||||||
|
self.test_cooldowns(ctx, ctx.invoked_with, cooldowns)
|
||||||
|
except CCError:
|
||||||
return
|
return
|
||||||
await self.call_cc_command(ctx, raw_response, message)
|
|
||||||
|
|
||||||
async def call_cc_command(self, ctx, raw_response, message) -> None:
|
|
||||||
# wrap the command here so it won't register with the bot
|
# wrap the command here so it won't register with the bot
|
||||||
fake_cc = commands.Command(ctx.invoked_with, self.cc_callback)
|
fake_cc = commands.Command(ctx.invoked_with, self.cc_callback)
|
||||||
fake_cc.params = self.prepare_args(raw_response)
|
fake_cc.params = self.prepare_args(raw_response)
|
||||||
ctx.command = fake_cc
|
ctx.command = fake_cc
|
||||||
|
|
||||||
await self.bot.invoke(ctx)
|
await self.bot.invoke(ctx)
|
||||||
if not ctx.command_failed:
|
if not ctx.command_failed:
|
||||||
await self.cc_command(*ctx.args, **ctx.kwargs, raw_response=raw_response)
|
await self.cc_command(*ctx.args, **ctx.kwargs, raw_response=raw_response)
|
||||||
@ -429,6 +493,25 @@ class CustomCommands:
|
|||||||
fin = default + [(p.name, p) for p in fin]
|
fin = default + [(p.name, p) for p in fin]
|
||||||
return OrderedDict(fin)
|
return OrderedDict(fin)
|
||||||
|
|
||||||
|
def test_cooldowns(self, ctx, command, cooldowns):
|
||||||
|
now = datetime.utcnow()
|
||||||
|
new_cooldowns = {}
|
||||||
|
for per, rate in cooldowns.items():
|
||||||
|
if per == "guild":
|
||||||
|
key = (command, ctx.guild)
|
||||||
|
elif per == "channel":
|
||||||
|
key = (command, ctx.guild, ctx.channel)
|
||||||
|
elif per == "member":
|
||||||
|
key = (command, ctx.guild, ctx.author)
|
||||||
|
cooldown = self.cooldowns.get(key)
|
||||||
|
if cooldown:
|
||||||
|
cooldown += timedelta(seconds=rate)
|
||||||
|
if cooldown > now:
|
||||||
|
raise OnCooldown()
|
||||||
|
new_cooldowns[key] = now
|
||||||
|
# only update cooldowns if the command isn't on cooldown
|
||||||
|
self.cooldowns.update(new_cooldowns)
|
||||||
|
|
||||||
def transform_arg(self, result, attr, obj) -> str:
|
def transform_arg(self, result, attr, obj) -> str:
|
||||||
attr = attr[1:] # strip initial dot
|
attr = attr[1:] # strip initial dot
|
||||||
if not attr:
|
if not attr:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user