jack1142 febca8ccbb
Migration to discord.py 2.0 (#5600)
* Temporarily set d.py to use latest git revision

* Remove `bot` param to Client.start

* Switch to aware datetimes

A lot of this is removing `.replace(...)` which while not technically
needed, simplifies the code base. There's only a few changes that are
actually necessary here.

* Update to work with new Asset design

* [threads] Update core ModLog API to support threads

- Added proper support for passing `Thread` to `channel`
  when creating/editing case
- Added `parent_channel_id` attribute to Modlog API's Case
    - Added `parent_channel` property that tries to get parent channel
- Updated case's content to show both thread and parent information

* [threads] Disallow usage of threads in some of the commands

- announceset channel
- filter channel clear
- filter channel add
- filter channel remove
- GlobalUniqueObjectFinder converter
    - permissions addglobalrule
    - permissions removeglobalrule
    - permissions removeserverrule
    - Permissions cog does not perform any validation for IDs
      when setting through YAML so that has not been touched
- streamalert twitch/youtube/picarto
- embedset channel
- set ownernotifications adddestination

* [threads] Handle threads in Red's permissions system (Requires)

- Made permissions system apply rules of (only) parent in threads

* [threads] Update embed_requested to support threads

- Threads don't have their own embed settings and inherit from parent

* [threads] Update Red.message_eligible_as_command to support threads

* [threads] Properly handle invocation of [p](un)mutechannel in threads

Usage of a (un)mutechannel will mute/unmute user in the parent channel
if it's invoked in a thread.

* [threads] Update Filter cog to properly handle threads

- `[p]filter channel list` in a threads sends list for parent channel
- Checking for filter hits for a message in a thread checks its parent
  channel's word list. There's no separate word list for threads.

* [threads] Support threads in Audio cog

- Handle threads being notify channels
- Update type hint for `is_query_allowed()`

* [threads] Update type hints and documentation to reflect thread support

- Documented that `{channel}` in CCs might be a thread
- Allowed (documented) usage of threads with `Config.channel()`
    - Separate thread scope is still in the picture though
      if it were to be done, it's going to be in separate in PR
- GuildContext.channel might be Thread

* Use less costy channel check in customcom's on_message_without_command

This isn't needed for d.py 2.0 but whatever...

* Update for in-place edits

* Embed's bool changed behavior, I'm hoping it doesn't affect us

* Address User.permissions_in() removal

* Swap VerificationLevel.extreme with VerificationLevel.highest

* Change to keyword-only parameters

* Change of `Guild.vanity_invite()` return type

* avatar -> display_avatar

* Fix metaclass shenanigans with Converter

* Update Red.add_cog() to be inline with `dpy_commands.Bot.add_cog()`

This means adding `override` keyword-only parameter and causing
small breakage by swapping RuntimeError with discord.ClientException.

* Address all DEP-WARNs

* Remove Context.clean_prefix and use upstream implementation instead

* Remove commands.Literal and use upstream implementation instead

Honestly, this was a rather bad implementation anyway...

Breaking but actually not really - it was provisional.

* Update Command.callback's setter

Support for functools.partial is now built into d.py

* Add new perms in HUMANIZED_PERM mapping (some from d.py 1.7 it seems)

BTW, that should really be in core instead of what we have now...

* Remove the part of do_conversion that has not worked for a long while

* Stop wrapping BadArgument in ConversionFailure

This is breaking but it's best to resolve it like this.

The functionality of ConversionFailure can be replicated with
Context.current_parameter and Context.current_argument.

* Add custom errors for int and float converters

* Remove Command.__call__ as it's now implemented in d.py

* Get rid of _dpy_reimplements

These were reimplemented for the purpose of typing
so it is no longer needed now that d.py is type hinted.

* Add return to Red.remove_cog

* Ensure we don't delete messages that differ only by used sticker

* discord.InvalidArgument->ValueError

* Move from raw <t:...> syntax to discord.utils.format_dt()

* Address AsyncIter removal

* Swap to pos-only for params that are pos-only in upstream

* Update for changes to Command.params

* [threads] Support threads in ignore checks and allow ignoring them

- Updated `[p](un)ignore channel` to accept threads
- Updated `[p]ignore list` to list ignored threads
- Updated logic in `Red.ignored_channel_or_guild()`

Ignores for guild channels now work as follows (only changes for threads):
- if channel is not a thread:
    - check if user has manage channels perm in channel
      and allow command usage if so
    - check if channel is ignored and disallow command usage if so
    - allow command usage if none of the conditions above happened
- if channel is a thread:
    - check if user has manage channels perm in parent channel
      and allow command usage if so
    - check if parent channel is ignored and disallow command usage
      if so
    - check if user has manage thread perm in parent channel
      and allow command usage if so
    - check if thread is ignored and disallow command usage if so
    - allow command usage if none of the conditions above happened

* [partial] Raise TypeError when channel is of PartialMessageable type

- Red.embed_requested
- Red.ignored_channel_or_guild

* [partial] Discard command messages when channel is PartialMessageable

* [threads] Add utilities for checking appropriate perms in both channels & threads

* [threads] Update code to use can_react_in() and @bot_can_react()

* [threads] Update code to use can_send_messages_in

* [threads] Add send_messages_in_threads perm to mute role and overrides

* [threads] Update code to use (bot/user)_can_manage_channel

* [threads] Update [p]diagnoseissues to work with threads

* Type hint fix

* [threads] Patch vendored discord.ext.menus to check proper perms in threads

I guess we've reached time when we have to patch the lib we vendor...

* Make docs generation work with non-final d.py releases

* Update discord.utils.oauth_url() usage

* Swap usage of discord.Embed.Empty/discord.embeds.EmptyEmbed to None

* Update usage of Guild.member_count to work with `None`

* Switch from Guild.vanity_invite() to Guild.vanity_url

* Update startup process to work with d.py's new asynchronous startup

* Use setup_hook() for pre-connect actions

* Update core's add_cog, remove_cog, and load_extension methods

* Update all setup functions to async and add awaits to bot.add_cog calls

* Modernize cogs by using async cog_load and cog_unload

* Address StoreChannel removal

* [partial] Disallow passing PartialMessageable to Case.channel

* [partial] Update cogs and utils to work better with PartialMessageable

- Ignore messages with PartialMessageable channel in CustomCommands cog
- In Filter cog, don't pass channel to modlog.create_case()
  if it's PartialMessageable
- In Trivia cog, only compare channel IDs
- Make `.utils.menus.menu()` work for messages
  with PartialMessageable channel
- Make checks in `.utils.tunnel.Tunnel.communicate()` more rigid

* Add few missing DEP-WARNs
2022-04-03 03:21:20 +02:00

881 lines
33 KiB
Python

import asyncio
import re
import random
from datetime import datetime, timedelta
from inspect import Parameter
from typing import Iterable, List, Mapping, Tuple, Dict, Set, Literal, Union
from urllib.parse import quote_plus
import discord
from fuzzywuzzy import process
from redbot.core import Config, checks, commands
from redbot.core.i18n import Translator, cog_i18n
from redbot.core.utils import menus, AsyncIter
from redbot.core.utils.chat_formatting import box, pagify, escape, humanize_list
from redbot.core.utils.predicates import MessagePredicate
_ = Translator("CustomCommands", __file__)
class CCError(Exception):
pass
class AlreadyExists(CCError):
pass
class ArgParseError(CCError):
pass
class NotFound(CCError):
pass
class OnCooldown(CCError):
pass
class CommandNotEdited(CCError):
pass
class ResponseTooLong(CCError):
pass
class CommandObj:
def __init__(self, **kwargs):
self.config = kwargs.get("config")
self.bot = kwargs.get("bot")
self.db = self.config.guild
@staticmethod
async def get_commands(config) -> dict:
_commands = await config.commands()
return {k: v for k, v in _commands.items() if _commands[k]}
async def redact_author_ids(self, user_id: int):
all_guilds = await self.config.all_guilds()
for guild_id in all_guilds.keys():
await asyncio.sleep(0)
async with self.config.guild_from_id(guild_id).commands() as all_commands:
async for com_name, com_info in AsyncIter(all_commands.items(), steps=100):
if not com_info:
continue
if com_info.get("author", {}).get("id", 0) == user_id:
com_info["author"]["id"] = 0xDE1
com_info["author"]["name"] = "Deleted User"
if editors := com_info.get("editors", None):
for index, editor_id in enumerate(editors):
if editor_id == user_id:
editors[index] = 0xDE1
async def get_responses(self, ctx):
intro = _(
"Welcome to the interactive random {cc} maker!\n"
"Every message you send will be added as one of the random "
"responses to choose from once this {cc} is "
"triggered. To exit this interactive menu, type `{quit}`"
).format(cc="customcommand", quit="exit()")
await ctx.send(intro)
responses = []
args = None
while True:
await ctx.send(_("Add a random response:"))
msg = await self.bot.wait_for("message", check=MessagePredicate.same_context(ctx))
if msg.content.lower() == "exit()":
break
elif len(msg.content) > 2000:
await ctx.send(
_(
"The text response you're trying to create has more than 2000 characters.\n"
"I cannot send messages that are longer than 2000 characters, please try again."
)
)
continue
else:
try:
this_args = ctx.cog.prepare_args(msg.content)
except ArgParseError as e:
await ctx.send(e.args[0])
continue
if args and args != this_args:
await ctx.send(_("Random responses must take the same arguments!"))
continue
args = args or this_args
responses.append(msg.content)
return responses
@staticmethod
def get_now() -> str:
# Get current time as a string, for 'created_at' and 'edited_at' fields
# in the ccinfo dict
return "{:%d/%m/%Y %H:%M:%S}".format(datetime.utcnow())
async def get(self, message: discord.Message, command: str) -> Tuple[str, Dict]:
if not command:
raise NotFound()
ccinfo = await self.db(message.guild).commands.get_raw(command, default=None)
if not ccinfo:
raise NotFound()
else:
return ccinfo["response"], ccinfo.get("cooldowns", {})
async def get_full(self, message: discord.Message, command: str) -> Dict:
ccinfo = await self.db(message.guild).commands.get_raw(command, default=None)
if ccinfo:
return ccinfo
else:
raise NotFound()
async def create(
self, ctx: commands.Context, command: str, *, response: Union[str, List[str]]
):
"""Create a custom command"""
# Check if this command is already registered as a customcommand
if await self.db(ctx.guild).commands.get_raw(command, default=None):
raise AlreadyExists()
# Check against those pesky nitro users!
if isinstance(response, str) and len(response) > 2000:
raise ResponseTooLong()
elif isinstance(response, list) and any([len(i) > 2000 for i in response]):
raise ResponseTooLong()
# test to raise
ctx.cog.prepare_args(response if isinstance(response, str) else response[0])
author = ctx.message.author
ccinfo = {
"author": {"id": author.id, "name": str(author)},
"command": command,
"cooldowns": {},
"created_at": self.get_now(),
"editors": [],
"response": response,
}
await self.db(ctx.guild).commands.set_raw(command, value=ccinfo)
async def edit(
self,
ctx: commands.Context,
command: str,
*,
response=None,
cooldowns: Mapping[str, int] = None,
ask_for: bool = True,
):
"""Edit an already existing custom command"""
ccinfo = await self.db(ctx.guild).commands.get_raw(command, default=None)
# Check if this command is registered
if not ccinfo:
raise NotFound()
author = ctx.message.author
if ask_for and not response:
await ctx.send(_("Do you want to create a 'randomized' custom command?") + " (yes/no)")
pred = MessagePredicate.yes_or_no(ctx)
try:
await self.bot.wait_for("message", check=pred, timeout=30)
except asyncio.TimeoutError:
await ctx.send(_("Response timed out, please try again later."))
raise CommandNotEdited()
if pred.result is True:
response = await self.get_responses(ctx=ctx)
else:
await ctx.send(_("What response do you want?"))
try:
resp = await self.bot.wait_for(
"message", check=MessagePredicate.same_context(ctx), timeout=180
)
except asyncio.TimeoutError:
await ctx.send(_("Response timed out, please try again later."))
raise CommandNotEdited()
response = resp.content
if response:
# test to raise
if len(response) > 2000:
raise ResponseTooLong()
ctx.cog.prepare_args(response if isinstance(response, str) else response[0])
ccinfo["response"] = response
if cooldowns:
ccinfo.setdefault("cooldowns", {}).update(cooldowns)
for key, value in ccinfo["cooldowns"].copy().items():
if value <= 0:
del ccinfo["cooldowns"][key]
if author.id not in ccinfo["editors"]:
# Add the person who invoked the `edit` coroutine to the list of
# editors, if the person is not yet in there
ccinfo["editors"].append(author.id)
ccinfo["edited_at"] = self.get_now()
await self.db(ctx.guild).commands.set_raw(command, value=ccinfo)
async def delete(self, ctx: commands.Context, command: str):
"""Delete an already existing custom command"""
# Check if this command is registered
if not await self.db(ctx.guild).commands.get_raw(command, default=None):
raise NotFound()
await self.db(ctx.guild).commands.set_raw(command, value=None)
@cog_i18n(_)
class CustomCommands(commands.Cog):
"""This cog contains commands for creating and managing custom commands that display text.
These are useful for storing information members might need, like FAQ answers or invite links.
Custom commands can be used by anyone by default, so be careful with pings.
Commands can only be lowercase, and will not respond to any uppercase letters.
"""
def __init__(self, bot):
super().__init__()
self.bot = bot
self.key = 414589031223512
self.config = Config.get_conf(self, self.key)
self.config.register_guild(commands={})
self.commandobj = CommandObj(config=self.config, bot=self.bot)
self.cooldowns = {}
async def red_delete_data_for_user(
self,
*,
requester: Literal["discord_deleted_user", "owner", "user", "user_strict"],
user_id: int,
):
if requester != "discord_deleted_user":
return
await self.commandobj.redact_author_ids(user_id)
@commands.group(aliases=["cc"])
@commands.guild_only()
async def customcom(self, ctx: commands.Context):
"""Base command for Custom Commands management."""
pass
@customcom.command(name="raw")
async def cc_raw(self, ctx: commands.Context, command: str.lower):
"""Get the raw response of a custom command, to get the proper markdown.
This is helpful for copy and pasting.
**Arguments:**
- `<command>` The custom command to get the raw response of."""
commands = await self.config.guild(ctx.guild).commands()
if command not in commands:
return await ctx.send("That command doesn't exist.")
command = commands[command]
if isinstance(command["response"], str):
raw = discord.utils.escape_markdown(command["response"])
if len(raw) > 2000:
raw = f"{raw[:1997]}..."
await ctx.send(raw)
else:
msglist = []
if await ctx.embed_requested():
colour = await ctx.embed_colour()
for number, response in enumerate(command["response"], start=1):
raw = discord.utils.escape_markdown(response)
if len(raw) > 2048:
raw = f"{raw[:2045]}..."
embed = discord.Embed(
title=_("Response #{num}/{total}").format(
num=number, total=len(command["response"])
),
description=raw,
colour=colour,
)
msglist.append(embed)
else:
for number, response in enumerate(command["response"], start=1):
raw = discord.utils.escape_markdown(response)
msg = _("Response #{num}/{total}:\n{raw}").format(
num=number, total=len(command["response"]), raw=raw
)
if len(msg) > 2000:
msg = f"{msg[:1997]}..."
msglist.append(msg)
await menus.menu(ctx, msglist, menus.DEFAULT_CONTROLS)
@customcom.command(name="search")
@commands.guild_only()
async def cc_search(self, ctx: commands.Context, *, query):
"""
Searches through custom commands, according to the query.
Uses fuzzywuzzy searching to find close matches.
**Arguments:**
- `<query>` The query to search for. Can be multiple words.
"""
cc_commands = await CommandObj.get_commands(self.config.guild(ctx.guild))
extracted = process.extract(query, list(cc_commands.keys()))
accepted = []
for entry in extracted:
if entry[1] > 60:
# Match was decently strong
accepted.append((entry[0], cc_commands[entry[0]]))
else:
# Match wasn't strong enough
pass
if len(accepted) == 0:
return await ctx.send(_("No close matches were found."))
results = self.prepare_command_list(ctx, accepted)
if await ctx.embed_requested():
content = " \n".join(map("**{0[0]}** {0[1]}".format, results))
embed = discord.Embed(
title=_("Search results"), description=content, colour=await ctx.embed_colour()
)
await ctx.send(embed=embed)
else:
content = "\n".join(map("{0[0]:<12} : {0[1]}".format, results))
await ctx.send(_("The following matches have been found:") + box(content))
@customcom.group(name="create", aliases=["add"], invoke_without_command=True)
@checks.mod_or_permissions(administrator=True)
async def cc_create(self, ctx: commands.Context, command: str.lower, *, text: str):
"""Create custom commands.
If a type is not specified, a simple CC will be created.
CCs can be enhanced with arguments, see the guide
[here](https://docs.discord.red/en/stable/cog_customcom.html).
"""
await ctx.invoke(self.cc_create_simple, command=command, text=text)
@cc_create.command(name="random")
@checks.mod_or_permissions(administrator=True)
async def cc_create_random(self, ctx: commands.Context, command: str.lower):
"""Create a CC where it will randomly choose a response!
Note: This command is interactive.
**Arguments:**
- `<command>` The command executed to return the text. Cast to lowercase.
"""
if any(char.isspace() for char in command):
# Haha, nice try
await ctx.send(_("Custom command names cannot have spaces in them."))
return
if command in (*self.bot.all_commands, *commands.RESERVED_COMMAND_NAMES):
await ctx.send(_("There already exists a bot command with the same name."))
return
responses = await self.commandobj.get_responses(ctx=ctx)
if not responses:
await ctx.send(_("Custom command process cancelled."))
return
try:
await self.commandobj.create(ctx=ctx, command=command, response=responses)
await ctx.send(_("Custom command successfully added."))
except AlreadyExists:
await ctx.send(
_("This command already exists. Use `{command}` to edit it.").format(
command=f"{ctx.clean_prefix}customcom edit"
)
)
except ResponseTooLong: # This isn't needed, however may be a good idea to keep this.
await ctx.send(
_(
"The text response you're trying to create has more than 2000 characters.\n"
"I cannot send messages that are longer than 2000 characters."
)
)
@cc_create.command(name="simple")
@checks.mod_or_permissions(administrator=True)
async def cc_create_simple(self, ctx, command: str.lower, *, text: str):
"""Add a simple custom command.
Example:
- `[p]customcom create simple yourcommand Text you want`
**Arguments:**
- `<command>` The command executed to return the text. Cast to lowercase.
- `<text>` The text to return when executing the command. See guide for enhanced usage.
"""
if any(char.isspace() for char in command):
# Haha, nice try
await ctx.send(_("Custom command names cannot have spaces in them."))
return
if command in (*self.bot.all_commands, *commands.RESERVED_COMMAND_NAMES):
await ctx.send(_("There already exists a bot command with the same name."))
return
try:
await self.commandobj.create(ctx=ctx, command=command, response=text)
await ctx.send(_("Custom command successfully added."))
except AlreadyExists:
await ctx.send(
_("This command already exists. Use `{command}` to edit it.").format(
command=f"{ctx.clean_prefix}customcom edit"
)
)
except ArgParseError as e:
await ctx.send(e.args[0])
except ResponseTooLong:
await ctx.send(
_(
"The text response you're trying to create has more than 2000 characters.\n"
"I cannot send messages that are longer than 2000 characters."
)
)
@customcom.command(name="cooldown")
@checks.mod_or_permissions(administrator=True)
async def cc_cooldown(
self, ctx, command: str.lower, cooldown: int = None, *, per: str.lower = "member"
):
"""Set, edit, or view the cooldown for a custom command.
You may set cooldowns per member, channel, or guild. Multiple
cooldowns may be set. All cooldowns must be cooled to call the
custom command.
Examples:
- `[p]customcom cooldown pingrole`
- `[p]customcom cooldown yourcommand 30`
- `[p]cc cooldown mycommand 30 guild`
**Arguments:**
- `<command>` The custom command to check or set the cooldown.
- `<cooldown>` The number of seconds to wait before allowing the command to be invoked again. If omitted, will instead return the current cooldown settings.
- `<per>` The group to apply the cooldown on. Defaults to per member. Valid choices are server, guild, user, and member.
"""
if cooldown is None:
try:
cooldowns = (await self.commandobj.get(ctx.message, command))[1]
except NotFound:
return await ctx.send(_("That command doesn't exist."))
if cooldowns:
cooldown = []
for per, rate in cooldowns.items():
cooldown.append(
_("A {} may call this command every {} seconds").format(per, rate)
)
return await ctx.send("\n".join(cooldown))
else:
return await ctx.send(_("This command has no cooldown."))
per = {"server": "guild", "user": "member"}.get(per, per)
allowed = ("guild", "member", "channel")
if per not in allowed:
return await ctx.send(_("{} must be one of {}").format("per", ", ".join(allowed)))
cooldown = {per: cooldown}
try:
await self.commandobj.edit(ctx=ctx, command=command, cooldowns=cooldown, ask_for=False)
await ctx.send(_("Custom command cooldown successfully edited."))
except NotFound:
await ctx.send(
_("That command doesn't exist. Use `{command}` to add it.").format(
command=f"{ctx.clean_prefix}customcom create"
)
)
@customcom.command(name="delete", aliases=["del", "remove"])
@checks.mod_or_permissions(administrator=True)
async def cc_delete(self, ctx, command: str.lower):
"""Delete a custom command.
Example:
- `[p]customcom delete yourcommand`
**Arguments:**
- `<command>` The custom command to delete.
"""
try:
await self.commandobj.delete(ctx=ctx, command=command)
await ctx.send(_("Custom command successfully deleted."))
except NotFound:
await ctx.send(_("That command doesn't exist."))
@customcom.command(name="edit")
@checks.mod_or_permissions(administrator=True)
async def cc_edit(self, ctx, command: str.lower, *, text: str = None):
"""Edit a custom command.
Example:
- `[p]customcom edit yourcommand Text you want`
**Arguments:**
- `<command>` The custom command to edit.
- `<text>` The new text to return when executing the command.
"""
try:
await self.commandobj.edit(ctx=ctx, command=command, response=text)
await ctx.send(_("Custom command successfully edited."))
except NotFound:
await ctx.send(
_("That command doesn't exist. Use `{command}` to add it.").format(
command=f"{ctx.clean_prefix}customcom create"
)
)
except ArgParseError as e:
await ctx.send(e.args[0])
except CommandNotEdited:
pass
except ResponseTooLong:
await ctx.send(
_(
"The text response you're trying to create has more than 2000 characters.\n"
"I cannot send messages that are longer than 2000 characters."
)
)
@customcom.command(name="list")
@commands.bot_can_react()
async def cc_list(self, ctx: commands.Context):
"""List all available custom commands.
The list displays a preview of each command's response, with
markdown escaped and newlines replaced with spaces.
"""
cc_dict = await CommandObj.get_commands(self.config.guild(ctx.guild))
if not cc_dict:
await ctx.send(
_(
"There are no custom commands in this server."
" Use `{command}` to start adding some."
).format(command=f"{ctx.clean_prefix}customcom create")
)
return
results = self.prepare_command_list(ctx, sorted(cc_dict.items(), key=lambda t: t[0]))
if await ctx.embed_requested():
# We need a space before the newline incase the CC preview ends in link (GH-2295)
content = " \n".join(map("**{0[0]}** {0[1]}".format, results))
pages = list(pagify(content, page_length=1024))
embed_pages = []
for idx, page in enumerate(pages, start=1):
embed = discord.Embed(
title=_("Custom Command List"),
description=page,
colour=await ctx.embed_colour(),
)
embed.set_footer(text=_("Page {num}/{total}").format(num=idx, total=len(pages)))
embed_pages.append(embed)
await menus.menu(ctx, embed_pages, menus.DEFAULT_CONTROLS)
else:
content = "\n".join(map("{0[0]:<12} : {0[1]}".format, results))
pages = list(map(box, pagify(content, page_length=2000, shorten_by=10)))
await menus.menu(ctx, pages, menus.DEFAULT_CONTROLS)
@customcom.command(name="show")
async def cc_show(self, ctx, command_name: str):
"""Shows a custom command's responses and its settings.
**Arguments:**
- `<command_name>` The custom command to show.
"""
try:
cmd = await self.commandobj.get_full(ctx.message, command_name)
except NotFound:
await ctx.send(_("I could not not find that custom command."))
return
responses = cmd["response"]
if isinstance(responses, str):
responses = [responses]
_aid = cmd["author"]["id"]
if _aid == 0xDE1:
author = _("Deleted User")
elif member := ctx.guild.get_member(_aid):
author = f"{member} ({_aid})"
else:
author = f"{cmd['author']['name']} ({_aid})"
_type = _("Random") if len(responses) > 1 else _("Normal")
text = _(
"Command: {command_name}\n"
"Author: {author}\n"
"Created: {created_at}\n"
"Type: {type}\n"
).format(
command_name=command_name, author=author, created_at=cmd["created_at"], type=_type
)
cooldowns = cmd.get("cooldowns", {})
if cooldowns:
cooldown_text = _("Cooldowns:\n")
for rate, per in cooldowns.items():
cooldown_text += _("{num} seconds per {period}\n").format(num=per, period=rate)
text += cooldown_text
text += _("Responses:\n")
responses = ["- " + r for r in responses]
text += "\n".join(responses)
for p in pagify(text):
await ctx.send(box(p, lang="yaml"))
@commands.Cog.listener()
async def on_message_without_command(self, message):
is_private = message.guild is None
# user_allowed check, will be replaced with self.bot.user_allowed or
# something similar once it's added
user_allowed = True
if isinstance(message.channel, discord.PartialMessageable):
return
if len(message.content) < 2 or is_private or not user_allowed or message.author.bot:
return
if await self.bot.cog_disabled_in_guild(self, message.guild):
return
ctx = await self.bot.get_context(message)
if ctx.prefix is None:
return
try:
raw_response, cooldowns = await self.commandobj.get(
message=message, command=ctx.invoked_with
)
if isinstance(raw_response, list):
raw_response = random.choice(raw_response)
elif isinstance(raw_response, str):
pass
else:
raise NotFound()
if cooldowns:
self.test_cooldowns(ctx, ctx.invoked_with, cooldowns)
except CCError:
return
# wrap the command here so it won't register with the bot
fake_cc = commands.command(name=ctx.invoked_with)(self.cc_callback)
fake_cc.params = self.prepare_args(raw_response)
fake_cc.requires.ready_event.set()
ctx.command = fake_cc
await self.bot.invoke(ctx)
if not ctx.command_failed:
await self.cc_command(*ctx.args, **ctx.kwargs, raw_response=raw_response)
async def cc_callback(self, *args, **kwargs) -> None:
"""
Custom command.
Created via the CustomCom cog. See `[p]customcom` for more details.
"""
# fake command to take advantage of discord.py's parsing and events
pass
async def cc_command(self, ctx, *cc_args, raw_response, **cc_kwargs) -> None:
cc_args = (*cc_args, *cc_kwargs.values())
results = re.findall(r"{([^}]+)\}", raw_response)
for result in results:
param = self.transform_parameter(result, ctx.message)
raw_response = raw_response.replace("{" + result + "}", param)
results = re.findall(r"{((\d+)[^.}]*(\.[^:}]+)?[^}]*)\}", raw_response)
if results:
low = min(int(result[1]) for result in results)
for result in results:
index = int(result[1]) - low
arg = self.transform_arg(result[0], result[2], cc_args[index])
raw_response = raw_response.replace("{" + result[0] + "}", arg)
await ctx.send(raw_response)
@staticmethod
def prepare_args(raw_response) -> Mapping[str, Parameter]:
args = re.findall(r"{(\d+)[^:}]*(:[^.}]*)?[^}]*\}", raw_response)
if not args:
return {}
allowed_builtins = {
"bool": bool,
"complex": complex,
"float": float,
"frozenset": frozenset,
"int": int,
"list": list,
"set": set,
"str": str,
"tuple": tuple,
"query": quote_plus,
}
indices = [int(a[0]) for a in args]
low = min(indices)
indices = [a - low for a in indices]
high = max(indices)
if high > 9:
raise ArgParseError(_("Too many arguments!"))
gaps = set(indices).symmetric_difference(range(high + 1))
if gaps:
raise ArgParseError(
_("Arguments must be sequential. Missing arguments: ")
+ ", ".join(str(i + low) for i in gaps)
)
fin = [Parameter("_" + str(i), Parameter.POSITIONAL_OR_KEYWORD) for i in range(high + 1)]
for arg in args:
index = int(arg[0]) - low
anno_raw = arg[1][1:] # strip initial colon
if anno_raw.lower().endswith("converter"):
anno_raw = anno_raw[:-9]
if not anno_raw or anno_raw.startswith("_"): # public types only
name = "{}_{}".format("text", index if index < high else "final")
fin[index] = fin[index].replace(name=name)
continue
# allow type hinting only for discord.py and builtin types
try:
anno = getattr(discord, anno_raw)
# force an AttributeError if there's no discord.py converter
getattr(commands, anno.__name__ + "Converter")
except AttributeError:
anno = allowed_builtins.get(anno_raw.lower(), Parameter.empty)
if (
anno is not Parameter.empty
and fin[index].annotation is not Parameter.empty
and anno != fin[index].annotation
):
raise ArgParseError(
_(
'Conflicting colon notation for argument {index}: "{name1}" and "{name2}".'
).format(
index=index + low,
name1=fin[index].annotation.__name__,
name2=anno.__name__,
)
)
if anno is not Parameter.empty:
fin[index] = fin[index].replace(annotation=anno)
# consume rest
fin[-1] = fin[-1].replace(kind=Parameter.KEYWORD_ONLY)
# name the parameters for the help text
for i, param in enumerate(fin):
anno = param.annotation
name = "{}_{}".format(
"text" if anno is Parameter.empty else anno.__name__.lower(),
i if i < high else "final",
)
fin[i] = fin[i].replace(name=name)
return dict((p.name, p) for p in fin)
def test_cooldowns(self, ctx, command, cooldowns):
now = datetime.utcnow()
new_cooldowns = {}
for per, rate in cooldowns.items():
if per == "guild":
key = (command, ctx.guild)
elif per == "channel":
key = (command, ctx.guild, ctx.channel)
elif per == "member":
key = (command, ctx.guild, ctx.author)
else:
raise ValueError(per)
cooldown = self.cooldowns.get(key)
if cooldown:
cooldown += timedelta(seconds=rate)
if cooldown > now:
raise OnCooldown()
new_cooldowns[key] = now
# only update cooldowns if the command isn't on cooldown
self.cooldowns.update(new_cooldowns)
@classmethod
def transform_arg(cls, result, attr, obj) -> str:
attr = attr[1:] # strip initial dot
if not attr:
return cls.maybe_humanize_list(obj)
raw_result = "{" + result + "}"
# forbid private members and nested attr lookups
if attr.startswith("_") or "." in attr:
return raw_result
return cls.maybe_humanize_list(getattr(obj, attr, raw_result))
@staticmethod
def maybe_humanize_list(thing) -> str:
if isinstance(thing, str):
return thing
try:
return humanize_list(list(map(str, thing)))
except TypeError:
return str(thing)
@staticmethod
def transform_parameter(result, message) -> str:
"""
For security reasons only specific objects are allowed
Internals are ignored
"""
raw_result = "{" + result + "}"
objects = {
"message": message,
"author": message.author,
"channel": message.channel,
"guild": message.guild,
"server": message.guild,
}
if result in objects:
return str(objects[result])
try:
first, second = result.split(".")
except ValueError:
return raw_result
if first in objects and not second.startswith("_"):
first = objects[first]
else:
return raw_result
return str(getattr(first, second, raw_result))
async def get_command_names(self, guild: discord.Guild) -> Set[str]:
"""Get all custom command names in a guild.
Returns
--------
Set[str]
A set of all custom command names.
"""
return set(await CommandObj.get_commands(self.config.guild(guild)))
@staticmethod
def prepare_command_list(
ctx: commands.Context, command_list: Iterable[Tuple[str, dict]]
) -> List[Tuple[str, str]]:
results = []
for command, body in command_list:
responses = body["response"]
if isinstance(responses, list):
result = ", ".join(responses)
elif isinstance(responses, str):
result = responses
else:
continue
# Cut preview to 52 characters max
if len(result) > 52:
result = result[:49] + "..."
# Replace newlines with spaces
result = result.replace("\n", " ")
# Escape markdown and mass mentions
result = escape(result, formatting=True, mass_mentions=True)
results.append((f"{ctx.clean_prefix}{command}", result))
return results