mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
Refactor fuzzy help and clean up help command (#2122)
What's changed: - Fixed issues mentioned on #2031 - Fuzzy help displays more like help manual - Fuzzy help is easier and more flexible to use - Fuzzy help string-matching ratio lowered to 80 - Help formatter is more extendable - Help command has been optimized, cleaned up and better incorporates fuzzy help - Added async_filter and async_enumerate utility functions because I was using them for this PR, then no longer needed them, but decided they might be useful anyway. - Added `Context.me` property which is a shortcut to `Context.guild.me` or `Context.bot.user`, depending on the channel type.
This commit is contained in:
parent
32b4c6ce86
commit
f7dbaca340
@ -1,6 +1,6 @@
|
||||
from copy import copy
|
||||
from re import search
|
||||
from typing import Generator, Tuple, Iterable
|
||||
from typing import Generator, Tuple, Iterable, Optional
|
||||
|
||||
import discord
|
||||
from redbot.core import Config, commands, checks
|
||||
@ -53,10 +53,13 @@ class Alias:
|
||||
return (AliasEntry.from_json(d, bot=self.bot) for d in (await self._aliases.entries()))
|
||||
|
||||
async def is_alias(
|
||||
self, guild: discord.Guild, alias_name: str, server_aliases: Iterable[AliasEntry] = ()
|
||||
) -> (bool, AliasEntry):
|
||||
self,
|
||||
guild: Optional[discord.Guild],
|
||||
alias_name: str,
|
||||
server_aliases: Iterable[AliasEntry] = (),
|
||||
) -> Tuple[bool, Optional[AliasEntry]]:
|
||||
|
||||
if not server_aliases:
|
||||
if not server_aliases and guild is not None:
|
||||
server_aliases = await self.unloaded_aliases(guild)
|
||||
|
||||
global_aliases = await self.unloaded_global_aliases()
|
||||
|
||||
@ -106,6 +106,36 @@ class Command(commands.Command):
|
||||
# We should expose anything which might be a bug in the converter
|
||||
raise exc
|
||||
|
||||
async def can_see(self, ctx: "Context"):
|
||||
"""Check if this command is visible in the given context.
|
||||
|
||||
In short, this will verify whether the user can run the
|
||||
command, and also whether the command is hidden or not.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : `Context`
|
||||
The invocation context to check with.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
``True`` if this command is visible in the given context.
|
||||
|
||||
"""
|
||||
for cmd in (self, *self.parents):
|
||||
if cmd.hidden:
|
||||
return False
|
||||
try:
|
||||
can_run = await self.can_run(ctx)
|
||||
except commands.CheckFailure:
|
||||
return False
|
||||
else:
|
||||
if can_run is False:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def disable_in(self, guild: discord.Guild) -> bool:
|
||||
"""Disable this command in the given guild.
|
||||
|
||||
|
||||
@ -237,3 +237,20 @@ class Context(commands.Context):
|
||||
)
|
||||
else:
|
||||
return await self.send(message)
|
||||
|
||||
@property
|
||||
def clean_prefix(self) -> str:
|
||||
"""str: The command prefix, but a mention prefix is displayed nicer."""
|
||||
me = self.me
|
||||
return self.prefix.replace(me.mention, f"@{me.display_name}")
|
||||
|
||||
@property
|
||||
def me(self) -> discord.abc.User:
|
||||
"""discord.abc.User: The bot member or user object.
|
||||
|
||||
If the context is DM, this will be a `discord.User` object.
|
||||
"""
|
||||
if self.guild is not None:
|
||||
return self.guild.me
|
||||
else:
|
||||
return self.bot.user
|
||||
|
||||
@ -2,20 +2,20 @@ import sys
|
||||
import codecs
|
||||
import datetime
|
||||
import logging
|
||||
import traceback
|
||||
from datetime import timedelta
|
||||
from distutils.version import StrictVersion
|
||||
|
||||
import aiohttp
|
||||
import discord
|
||||
import pkg_resources
|
||||
import traceback
|
||||
from colorama import Fore, Style, init
|
||||
from pkg_resources import DistributionNotFound
|
||||
|
||||
from . import __version__, commands
|
||||
from .data_manager import storage_type
|
||||
from .utils.chat_formatting import inline, bordered
|
||||
from .utils import fuzzy_command_search
|
||||
from .utils import fuzzy_command_search, format_fuzzy_results
|
||||
|
||||
log = logging.getLogger("red")
|
||||
sentry_log = logging.getLogger("red.sentry")
|
||||
@ -197,17 +197,6 @@ def init_events(bot, cli_flags):
|
||||
if disabled_message:
|
||||
await ctx.send(disabled_message.replace("{command}", ctx.invoked_with))
|
||||
elif isinstance(error, commands.CommandInvokeError):
|
||||
# Need to test if the following still works
|
||||
"""
|
||||
no_dms = "Cannot send messages to this user"
|
||||
is_help_cmd = ctx.command.qualified_name == "help"
|
||||
is_forbidden = isinstance(error.original, discord.Forbidden)
|
||||
if is_help_cmd and is_forbidden and error.original.text == no_dms:
|
||||
msg = ("I couldn't send the help message to you in DM. Either"
|
||||
" you blocked me or you disabled DMs in this server.")
|
||||
await ctx.send(msg)
|
||||
return
|
||||
"""
|
||||
log.exception(
|
||||
"Exception in command '{}'" "".format(ctx.command.qualified_name),
|
||||
exc_info=error.original,
|
||||
@ -231,12 +220,13 @@ def init_events(bot, cli_flags):
|
||||
if not hasattr(ctx.cog, "_{0.command.cog_name}__error".format(ctx)):
|
||||
await ctx.send(inline(message))
|
||||
elif isinstance(error, commands.CommandNotFound):
|
||||
term = ctx.invoked_with + " "
|
||||
if len(ctx.args) > 1:
|
||||
term += " ".join(ctx.args[1:])
|
||||
fuzzy_result = await fuzzy_command_search(ctx, ctx.invoked_with)
|
||||
if fuzzy_result is not None:
|
||||
await ctx.maybe_send_embed(fuzzy_result)
|
||||
fuzzy_commands = await fuzzy_command_search(ctx)
|
||||
if not fuzzy_commands:
|
||||
pass
|
||||
elif await ctx.embed_requested():
|
||||
await ctx.send(embed=await format_fuzzy_results(ctx, fuzzy_commands, embed=True))
|
||||
else:
|
||||
await ctx.send(await format_fuzzy_results(ctx, fuzzy_commands, embed=False))
|
||||
elif isinstance(error, commands.CheckFailure):
|
||||
pass
|
||||
elif isinstance(error, commands.NoPrivateMessage):
|
||||
|
||||
@ -20,25 +20,24 @@ message to help page.
|
||||
e.g. format_help_for(ctx, ctx.command, "Missing required arguments")
|
||||
|
||||
discord.py 1.0.0a
|
||||
Experimental: compatibility with 0.16.8
|
||||
|
||||
Copyrights to logic of code belong to Rapptz (Danny)
|
||||
Everything else credit to SirThane#1780"""
|
||||
This help formatter contains work by Rapptz (Danny) and SirThane#1780.
|
||||
"""
|
||||
from collections import namedtuple
|
||||
from typing import List
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import discord
|
||||
from discord.ext.commands import formatter
|
||||
from discord.ext.commands import formatter as dpy_formatter
|
||||
import inspect
|
||||
import itertools
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
from . import commands
|
||||
from redbot.core.utils.chat_formatting import pagify, box
|
||||
from redbot.core.utils import fuzzy_command_search
|
||||
from .i18n import Translator
|
||||
from .utils.chat_formatting import pagify
|
||||
from .utils import fuzzy_command_search, format_fuzzy_results
|
||||
|
||||
_ = Translator("Help", __file__)
|
||||
|
||||
EMPTY_STRING = "\u200b"
|
||||
|
||||
@ -49,7 +48,7 @@ _mention_pattern = re.compile("|".join(_mentions_transforms.keys()))
|
||||
EmbedField = namedtuple("EmbedField", "name value inline")
|
||||
|
||||
|
||||
class Help(formatter.HelpFormatter):
|
||||
class Help(dpy_formatter.HelpFormatter):
|
||||
"""Formats help for commands."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -57,15 +56,10 @@ class Help(formatter.HelpFormatter):
|
||||
self.command = None
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def pm_check(self, ctx):
|
||||
@staticmethod
|
||||
def pm_check(ctx):
|
||||
return isinstance(ctx.channel, discord.DMChannel)
|
||||
|
||||
@property
|
||||
def clean_prefix(self):
|
||||
maybe_member = self.context.guild.me if self.context.guild else self.context.bot.user
|
||||
pretty = f"@{maybe_member.display_name}"
|
||||
return self.context.prefix.replace(maybe_member.mention, pretty)
|
||||
|
||||
@property
|
||||
def me(self):
|
||||
return self.context.me
|
||||
@ -84,6 +78,8 @@ class Help(formatter.HelpFormatter):
|
||||
else:
|
||||
return await self.context.embed_colour()
|
||||
|
||||
colour = color
|
||||
|
||||
@property
|
||||
def destination(self):
|
||||
if self.context.bot.pm_help:
|
||||
@ -110,7 +106,7 @@ class Help(formatter.HelpFormatter):
|
||||
continue
|
||||
|
||||
if self.is_cog() or self.is_bot():
|
||||
name = "{0}{1}".format(self.clean_prefix, name)
|
||||
name = "{0}{1}".format(self.context.clean_prefix, name)
|
||||
|
||||
entries += "**{0}** {1}\n".format(name, command.short_doc)
|
||||
return entries
|
||||
@ -120,7 +116,7 @@ class Help(formatter.HelpFormatter):
|
||||
return (
|
||||
"Type {0}help <command> for more info on a command.\n"
|
||||
"You can also type {0}help <category> for more info on a category.".format(
|
||||
self.clean_prefix
|
||||
self.context.clean_prefix
|
||||
)
|
||||
)
|
||||
|
||||
@ -163,7 +159,7 @@ class Help(formatter.HelpFormatter):
|
||||
if self.command.help:
|
||||
splitted = self.command.help.split("\n\n")
|
||||
name = "__{0}__".format(splitted[0])
|
||||
value = "\n\n".join(splitted[1:]).replace("[p]", self.clean_prefix)
|
||||
value = "\n\n".join(splitted[1:]).replace("[p]", self.context.clean_prefix)
|
||||
if value == "":
|
||||
value = EMPTY_STRING
|
||||
field = EmbedField(name[:252], value[:1024], False)
|
||||
@ -213,7 +209,8 @@ class Help(formatter.HelpFormatter):
|
||||
|
||||
return emb
|
||||
|
||||
def group_fields(self, fields: List[EmbedField], max_chars=1000):
|
||||
@staticmethod
|
||||
def group_fields(fields: List[EmbedField], max_chars=1000):
|
||||
curr_group = []
|
||||
ret = []
|
||||
for f in fields:
|
||||
@ -277,158 +274,112 @@ class Help(formatter.HelpFormatter):
|
||||
|
||||
return ret
|
||||
|
||||
async def simple_embed(self, ctx, title=None, description=None, color=None):
|
||||
# Shortcut
|
||||
async def format_command_not_found(
|
||||
self, ctx: commands.Context, command_name: str
|
||||
) -> Optional[Union[str, discord.Message]]:
|
||||
"""Get the response for a user calling help on a missing command."""
|
||||
self.context = ctx
|
||||
if color is None:
|
||||
color = await self.color()
|
||||
embed = discord.Embed(title=title, description=description, color=color)
|
||||
embed.set_footer(text=ctx.bot.formatter.get_ending_note())
|
||||
embed.set_author(**self.author)
|
||||
return embed
|
||||
|
||||
async def cmd_not_found(self, ctx, cmd, description=None, color=None):
|
||||
# Shortcut for a shortcut. Sue me
|
||||
embed = await self.simple_embed(
|
||||
ctx, title="Command {} not found.".format(cmd), description=description, color=color
|
||||
return await default_command_not_found(
|
||||
ctx,
|
||||
command_name,
|
||||
use_embeds=True,
|
||||
colour=await self.colour(),
|
||||
author=self.author,
|
||||
footer={"text": self.get_ending_note()},
|
||||
)
|
||||
return embed
|
||||
|
||||
async def cmd_has_no_subcommands(self, ctx, cmd, color=None):
|
||||
embed = await self.simple_embed(
|
||||
ctx, title=ctx.bot.command_has_no_subcommands.format(cmd), color=color
|
||||
)
|
||||
return embed
|
||||
|
||||
|
||||
@commands.command(hidden=True)
|
||||
async def help(ctx, *cmds: str):
|
||||
"""Shows help documentation.
|
||||
async def help(ctx: commands.Context, *, command_name: str = ""):
|
||||
"""Show help documentation.
|
||||
|
||||
[p]**help**: Shows the help manual.
|
||||
[p]**help** command: Show help for a command
|
||||
[p]**help** Category: Show commands and description for a category"""
|
||||
destination = ctx.author if ctx.bot.pm_help else ctx
|
||||
|
||||
def repl(obj):
|
||||
return _mentions_transforms.get(obj.group(0), "")
|
||||
- `[p]help`: Show the help manual.
|
||||
- `[p]help command`: Show help for a command.
|
||||
- `[p]help Category`: Show commands and description for a category,
|
||||
"""
|
||||
bot = ctx.bot
|
||||
if bot.pm_help:
|
||||
destination = ctx.author
|
||||
else:
|
||||
destination = ctx.channel
|
||||
|
||||
use_embeds = await ctx.embed_requested()
|
||||
f = formatter.HelpFormatter()
|
||||
# help by itself just lists our own commands.
|
||||
if len(cmds) == 0:
|
||||
if use_embeds:
|
||||
embeds = await ctx.bot.formatter.format_help_for(ctx, ctx.bot)
|
||||
formatter = bot.formatter
|
||||
else:
|
||||
embeds = await f.format_help_for(ctx, ctx.bot)
|
||||
elif len(cmds) == 1:
|
||||
# try to see if it is a cog name
|
||||
name = _mention_pattern.sub(repl, cmds[0])
|
||||
command = None
|
||||
if name in ctx.bot.cogs:
|
||||
command = ctx.bot.cogs[name]
|
||||
else:
|
||||
command = ctx.bot.all_commands.get(name)
|
||||
if command is None:
|
||||
if use_embeds:
|
||||
fuzzy_result = await fuzzy_command_search(ctx, name)
|
||||
if fuzzy_result is not None:
|
||||
await destination.send(
|
||||
embed=await ctx.bot.formatter.cmd_not_found(
|
||||
ctx, name, description=fuzzy_result
|
||||
)
|
||||
)
|
||||
else:
|
||||
fuzzy_result = await fuzzy_command_search(ctx, name)
|
||||
if fuzzy_result is not None:
|
||||
await destination.send(
|
||||
ctx.bot.command_not_found.format(name, fuzzy_result)
|
||||
)
|
||||
return
|
||||
if use_embeds:
|
||||
embeds = await ctx.bot.formatter.format_help_for(ctx, command)
|
||||
else:
|
||||
embeds = await f.format_help_for(ctx, command)
|
||||
else:
|
||||
name = _mention_pattern.sub(repl, cmds[0])
|
||||
command = ctx.bot.all_commands.get(name)
|
||||
if command is None:
|
||||
if use_embeds:
|
||||
fuzzy_result = await fuzzy_command_search(ctx, name)
|
||||
if fuzzy_result is not None:
|
||||
await destination.send(
|
||||
embed=await ctx.bot.formatter.cmd_not_found(
|
||||
ctx, name, description=fuzzy_result
|
||||
)
|
||||
)
|
||||
else:
|
||||
fuzzy_result = await fuzzy_command_search(ctx, name)
|
||||
if fuzzy_result is not None:
|
||||
await destination.send(ctx.bot.command_not_found.format(name, fuzzy_result))
|
||||
return
|
||||
formatter = dpy_formatter.HelpFormatter()
|
||||
|
||||
for key in cmds[1:]:
|
||||
try:
|
||||
key = _mention_pattern.sub(repl, key)
|
||||
command = command.all_commands.get(key)
|
||||
if not command_name:
|
||||
# help by itself just lists our own commands.
|
||||
pages = await formatter.format_help_for(ctx, bot)
|
||||
else:
|
||||
command: commands.Command = bot.get_command(command_name)
|
||||
if command is None:
|
||||
if use_embeds:
|
||||
fuzzy_result = await fuzzy_command_search(ctx, name)
|
||||
if fuzzy_result is not None:
|
||||
await destination.send(
|
||||
embed=await ctx.bot.formatter.cmd_not_found(
|
||||
ctx, name, description=fuzzy_result
|
||||
)
|
||||
)
|
||||
if hasattr(formatter, "format_command_not_found"):
|
||||
msg = await formatter.format_command_not_found(ctx, command_name)
|
||||
else:
|
||||
fuzzy_result = await fuzzy_command_search(ctx, name)
|
||||
if fuzzy_result is not None:
|
||||
await destination.send(
|
||||
ctx.bot.command_not_found.format(name, fuzzy_result)
|
||||
)
|
||||
return
|
||||
except AttributeError:
|
||||
if use_embeds:
|
||||
await destination.send(
|
||||
embed=await ctx.bot.formatter.simple_embed(
|
||||
ctx,
|
||||
title='Command "{0.name}" has no subcommands.'.format(command),
|
||||
color=await ctx.bot.formatter.color(),
|
||||
)
|
||||
)
|
||||
msg = await default_command_not_found(ctx, command_name, use_embeds=use_embeds)
|
||||
pages = [msg]
|
||||
else:
|
||||
await destination.send(ctx.bot.command_has_no_subcommands.format(command))
|
||||
return
|
||||
if use_embeds:
|
||||
embeds = await ctx.bot.formatter.format_help_for(ctx, command)
|
||||
else:
|
||||
embeds = await f.format_help_for(ctx, command)
|
||||
pages = await formatter.format_help_for(ctx, command)
|
||||
|
||||
max_pages_in_guild = await ctx.bot.db.help.max_pages_in_guild()
|
||||
if len(embeds) > max_pages_in_guild:
|
||||
if len(pages) > max_pages_in_guild:
|
||||
destination = ctx.author
|
||||
if ctx.guild and not ctx.guild.me.permissions_in(ctx.channel).send_messages:
|
||||
destination = ctx.author
|
||||
try:
|
||||
for embed in embeds:
|
||||
if use_embeds:
|
||||
try:
|
||||
await destination.send(embed=embed)
|
||||
except discord.HTTPException:
|
||||
destination = ctx.author
|
||||
await destination.send(embed=embed)
|
||||
for page in pages:
|
||||
if isinstance(page, discord.Embed):
|
||||
await destination.send(embed=page)
|
||||
else:
|
||||
try:
|
||||
await destination.send(embed)
|
||||
except discord.HTTPException:
|
||||
destination = ctx.author
|
||||
await destination.send(embed)
|
||||
await destination.send(page)
|
||||
except discord.Forbidden:
|
||||
await ctx.channel.send(
|
||||
"I couldn't send the help message to you in DM. Either you blocked me or you disabled DMs in this server."
|
||||
_(
|
||||
"I couldn't send the help message to you in DM. Either you blocked me or you "
|
||||
"disabled DMs in this server."
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@help.error
|
||||
async def help_error(ctx, error):
|
||||
destination = ctx.author if ctx.bot.pm_help else ctx
|
||||
await destination.send("{0.__name__}: {1}".format(type(error), error))
|
||||
traceback.print_tb(error.original.__traceback__, file=sys.stderr)
|
||||
async def default_command_not_found(
|
||||
ctx: commands.Context, command_name: str, *, use_embeds: bool, **embed_options
|
||||
) -> Optional[Union[str, discord.Embed]]:
|
||||
"""Default function for formatting the response to a missing command."""
|
||||
ret = None
|
||||
cmds = command_name.split()
|
||||
prev_command = None
|
||||
for invoked in itertools.accumulate(cmds, lambda *args: " ".join(args)):
|
||||
command = ctx.bot.get_command(invoked)
|
||||
if command is None:
|
||||
if prev_command is not None and not isinstance(prev_command, commands.Group):
|
||||
ret = _("Command *{command_name}* has no subcommands.").format(
|
||||
command_name=prev_command.qualified_name
|
||||
)
|
||||
break
|
||||
elif not await command.can_see(ctx):
|
||||
return
|
||||
prev_command = command
|
||||
|
||||
if ret is None:
|
||||
fuzzy_commands = await fuzzy_command_search(ctx, command_name, min_score=75)
|
||||
if fuzzy_commands:
|
||||
ret = await format_fuzzy_results(ctx, fuzzy_commands, embed=use_embeds)
|
||||
else:
|
||||
ret = _("Command *{command_name}* not found.").format(command_name=command_name)
|
||||
|
||||
if use_embeds:
|
||||
if isinstance(ret, str):
|
||||
ret = discord.Embed(title=ret)
|
||||
if "colour" in embed_options:
|
||||
ret.colour = embed_options.pop("colour")
|
||||
elif "color" in embed_options:
|
||||
ret.colour = embed_options.pop("color")
|
||||
|
||||
if "author" in embed_options:
|
||||
ret.set_author(**embed_options.pop("author"))
|
||||
if "footer" in embed_options:
|
||||
ret.set_footer(**embed_options.pop("footer"))
|
||||
|
||||
return ret
|
||||
|
||||
@ -1,20 +1,42 @@
|
||||
__all__ = ["bounded_gather", "safe_delete", "fuzzy_command_search", "deduplicate_iterables"]
|
||||
|
||||
import asyncio
|
||||
from asyncio import as_completed, AbstractEventLoop, Semaphore
|
||||
from asyncio.futures import isfuture
|
||||
from itertools import chain
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from typing import Any, Awaitable, Iterator, List, Optional
|
||||
from asyncio import AbstractEventLoop, as_completed, Semaphore
|
||||
from asyncio.futures import isfuture
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
AsyncIterable,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
import discord
|
||||
from fuzzywuzzy import fuzz, process
|
||||
from redbot.core import commands
|
||||
from fuzzywuzzy import process
|
||||
|
||||
from .chat_formatting import box
|
||||
|
||||
__all__ = [
|
||||
"bounded_gather",
|
||||
"safe_delete",
|
||||
"fuzzy_command_search",
|
||||
"format_fuzzy_results",
|
||||
"deduplicate_iterables",
|
||||
]
|
||||
|
||||
_T = TypeVar("_T")
|
||||
|
||||
|
||||
# Benchmarked to be the fastest method.
|
||||
def deduplicate_iterables(*iterables):
|
||||
@ -26,11 +48,11 @@ def deduplicate_iterables(*iterables):
|
||||
return list(dict.fromkeys(chain.from_iterable(iterables)))
|
||||
|
||||
|
||||
def fuzzy_filter(record):
|
||||
def _fuzzy_log_filter(record):
|
||||
return record.funcName != "extractWithoutOrder"
|
||||
|
||||
|
||||
logging.getLogger().addFilter(fuzzy_filter)
|
||||
logging.getLogger().addFilter(_fuzzy_log_filter)
|
||||
|
||||
|
||||
def safe_delete(pth: Path):
|
||||
@ -47,59 +69,222 @@ def safe_delete(pth: Path):
|
||||
shutil.rmtree(str(pth), ignore_errors=True)
|
||||
|
||||
|
||||
async def filter_commands(ctx: commands.Context, extracted: list):
|
||||
return [
|
||||
i
|
||||
for i in extracted
|
||||
if i[1] >= 90
|
||||
and not i[0].hidden
|
||||
and not any([p.hidden for p in i[0].parents])
|
||||
and await i[0].can_run(ctx)
|
||||
and all([await p.can_run(ctx) for p in i[0].parents])
|
||||
]
|
||||
class AsyncFilter(AsyncIterator[_T], Awaitable[List[_T]]):
|
||||
"""Class returned by `async_filter`. See that function for details.
|
||||
|
||||
We don't recommend instantiating this class directly.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
func: Callable[[_T], Union[bool, Awaitable[bool]]],
|
||||
iterable: Union[AsyncIterable[_T], Iterable[_T]],
|
||||
) -> None:
|
||||
self.__func: Callable[[_T], Union[bool, Awaitable[bool]]] = func
|
||||
self.__iterable: Union[AsyncIterable[_T], Iterable[_T]] = iterable
|
||||
|
||||
# We assign the generator strategy based on the arguments' types
|
||||
if isinstance(iterable, AsyncIterable):
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
self.__generator_instance = self.__async_generator_async_pred()
|
||||
else:
|
||||
self.__generator_instance = self.__async_generator_sync_pred()
|
||||
elif asyncio.iscoroutinefunction(func):
|
||||
self.__generator_instance = self.__sync_generator_async_pred()
|
||||
else:
|
||||
raise TypeError("Must be either an async predicate, an async iterable, or both.")
|
||||
|
||||
async def __sync_generator_async_pred(self) -> AsyncIterator[_T]:
|
||||
for item in self.__iterable:
|
||||
if await self.__func(item):
|
||||
yield item
|
||||
|
||||
async def __async_generator_sync_pred(self) -> AsyncIterator[_T]:
|
||||
async for item in self.__iterable:
|
||||
if self.__func(item):
|
||||
yield item
|
||||
|
||||
async def __async_generator_async_pred(self) -> AsyncIterator[_T]:
|
||||
async for item in self.__iterable:
|
||||
if await self.__func(item):
|
||||
yield item
|
||||
|
||||
async def __flatten(self) -> List[_T]:
|
||||
return [item async for item in self]
|
||||
|
||||
def __await__(self):
|
||||
# Simply return the generator filled into a list
|
||||
return self.__flatten().__await__()
|
||||
|
||||
def __anext__(self) -> Awaitable[_T]:
|
||||
# This will use the generator strategy set in __init__
|
||||
return self.__generator_instance.__anext__()
|
||||
|
||||
|
||||
async def fuzzy_command_search(ctx: commands.Context, term: str):
|
||||
out = []
|
||||
def async_filter(
|
||||
func: Callable[[_T], Union[bool, Awaitable[bool]]],
|
||||
iterable: Union[AsyncIterable[_T], Iterable[_T]],
|
||||
) -> AsyncFilter[_T]:
|
||||
"""Filter an (optionally async) iterable with an (optionally async) predicate.
|
||||
|
||||
At least one of the arguments must be async.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
func : Callable[[T], Union[bool, Awaitable[bool]]]
|
||||
A function or coroutine function which takes one item of ``iterable``
|
||||
as an argument, and returns ``True`` or ``False``.
|
||||
iterable : Union[AsyncIterable[_T], Iterable[_T]]
|
||||
An iterable or async iterable which is to be filtered.
|
||||
|
||||
Raises
|
||||
------
|
||||
TypeError
|
||||
If neither of the arguments are async.
|
||||
|
||||
Returns
|
||||
-------
|
||||
AsyncFilter[T]
|
||||
An object which can either be awaited to yield a list of the filtered
|
||||
items, or can also act as an async iterator to yield items one by one.
|
||||
|
||||
"""
|
||||
return AsyncFilter(func, iterable)
|
||||
|
||||
|
||||
async def async_enumerate(
|
||||
async_iterable: AsyncIterable[_T], start: int = 0
|
||||
) -> AsyncIterator[Tuple[int, _T]]:
|
||||
"""Async iterable version of `enumerate`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
async_iterable : AsyncIterable[T]
|
||||
The iterable to enumerate.
|
||||
start : int
|
||||
The index to start from. Defaults to 0.
|
||||
|
||||
Returns
|
||||
-------
|
||||
AsyncIterator[Tuple[int, T]]
|
||||
An async iterator of tuples in the form of ``(index, item)``.
|
||||
|
||||
"""
|
||||
async for item in async_iterable:
|
||||
yield start, item
|
||||
start += 1
|
||||
|
||||
|
||||
async def fuzzy_command_search(
|
||||
ctx: commands.Context, term: Optional[str] = None, *, min_score: int = 80
|
||||
) -> Optional[List[commands.Command]]:
|
||||
"""Search for commands which are similar in name to the one invoked.
|
||||
|
||||
Returns a maximum of 5 commands which must all be at least matched
|
||||
greater than ``min_score``.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : `commands.Context <redbot.core.commands.Context>`
|
||||
The command invocation context.
|
||||
term : Optional[str]
|
||||
The name of the invoked command. If ``None``, `Context.invoked_with`
|
||||
will be used instead.
|
||||
min_score : int
|
||||
The minimum score for matched commands to reach. Defaults to 80.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Optional[List[`commands.Command <redbot.core.commands.Command>`]]
|
||||
A list of commands which were fuzzily matched with the invoked
|
||||
command.
|
||||
|
||||
"""
|
||||
if ctx.guild is not None:
|
||||
enabled = await ctx.bot.db.guild(ctx.guild).fuzzy()
|
||||
else:
|
||||
enabled = await ctx.bot.db.fuzzy()
|
||||
|
||||
if not enabled:
|
||||
return None
|
||||
return
|
||||
|
||||
if term is None:
|
||||
term = ctx.invoked_with
|
||||
|
||||
# If the term is an alias or CC, we don't want to send a supplementary fuzzy search.
|
||||
alias_cog = ctx.bot.get_cog("Alias")
|
||||
if alias_cog is not None:
|
||||
is_alias, alias = await alias_cog.is_alias(ctx.guild, term)
|
||||
|
||||
if is_alias:
|
||||
return None
|
||||
|
||||
return
|
||||
customcom_cog = ctx.bot.get_cog("CustomCommands")
|
||||
if customcom_cog is not None:
|
||||
cmd_obj = customcom_cog.commandobj
|
||||
|
||||
try:
|
||||
ccinfo = await cmd_obj.get(ctx.message, term)
|
||||
await cmd_obj.get(ctx.message, term)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
return None
|
||||
return
|
||||
|
||||
extracted_cmds = await filter_commands(
|
||||
ctx, process.extract(term, ctx.bot.walk_commands(), limit=5)
|
||||
# Do the scoring. `extracted` is a list of tuples in the form `(command, score)`
|
||||
extracted = process.extract(term, ctx.bot.walk_commands(), limit=5, scorer=fuzz.QRatio)
|
||||
if not extracted:
|
||||
return
|
||||
|
||||
# Filter through the fuzzy-matched commands.
|
||||
matched_commands = []
|
||||
for command, score in extracted:
|
||||
if score < min_score:
|
||||
# Since the list is in decreasing order of score, we can exit early.
|
||||
break
|
||||
if await command.can_see(ctx):
|
||||
matched_commands.append(command)
|
||||
|
||||
return matched_commands
|
||||
|
||||
|
||||
async def format_fuzzy_results(
|
||||
ctx: commands.Context,
|
||||
matched_commands: List[commands.Command],
|
||||
*,
|
||||
embed: Optional[bool] = None,
|
||||
) -> Union[str, discord.Embed]:
|
||||
"""Format the result of a fuzzy command search.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : `commands.Context <redbot.core.commands.Context>`
|
||||
The context in which this result is being displayed.
|
||||
matched_commands : List[`commands.Command <redbot.core.commands.Command>`]
|
||||
A list of commands which have been matched by the fuzzy search, sorted
|
||||
in order of decreasing similarity.
|
||||
embed : bool
|
||||
Whether or not the result should be an embed. If set to ``None``, this
|
||||
will default to the result of `ctx.embed_requested`.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Union[str, discord.Embed]
|
||||
The formatted results.
|
||||
|
||||
"""
|
||||
if embed is not False and (embed is True or await ctx.embed_requested()):
|
||||
lines = []
|
||||
for cmd in matched_commands:
|
||||
lines.append(f"**{ctx.clean_prefix}{cmd.qualified_name}** {cmd.short_doc}")
|
||||
return discord.Embed(
|
||||
title="Perhaps you wanted one of these?",
|
||||
colour=await ctx.embed_colour(),
|
||||
description="\n".join(lines),
|
||||
)
|
||||
|
||||
if not extracted_cmds:
|
||||
return None
|
||||
|
||||
for pos, extracted in enumerate(extracted_cmds, 1):
|
||||
short = " - {}".format(extracted[0].short_doc) if extracted[0].short_doc else ""
|
||||
out.append("{0}. {1.prefix}{2.qualified_name}{3}".format(pos, ctx, extracted[0], short))
|
||||
|
||||
return box("\n".join(out), lang="Perhaps you wanted one of these?")
|
||||
else:
|
||||
lines = []
|
||||
for cmd in matched_commands:
|
||||
lines.append(f"{ctx.clean_prefix}{cmd.qualified_name} -- {cmd.short_doc}")
|
||||
return "Perhaps you wanted one of these? " + box("\n".join(lines), lang="vhdl")
|
||||
|
||||
|
||||
async def _sem_wrapper(sem, task):
|
||||
@ -124,9 +309,11 @@ def bounded_gather_iter(
|
||||
loop : asyncio.AbstractEventLoop
|
||||
The event loop to use for the semaphore and :meth:`asyncio.gather`.
|
||||
limit : Optional[`int`]
|
||||
The maximum number of concurrent tasks. Used when no ``semaphore`` is passed.
|
||||
The maximum number of concurrent tasks. Used when no ``semaphore``
|
||||
is passed.
|
||||
semaphore : Optional[:class:`asyncio.Semaphore`]
|
||||
The semaphore to use for bounding tasks. If `None`, create one using ``loop`` and ``limit``.
|
||||
The semaphore to use for bounding tasks. If `None`, create one
|
||||
using ``loop`` and ``limit``.
|
||||
|
||||
Raises
|
||||
------
|
||||
@ -173,9 +360,11 @@ def bounded_gather(
|
||||
return_exceptions : bool
|
||||
If true, gather exceptions in the result list instead of raising.
|
||||
limit : Optional[`int`]
|
||||
The maximum number of concurrent tasks. Used when no ``semaphore`` is passed.
|
||||
The maximum number of concurrent tasks. Used when no ``semaphore``
|
||||
is passed.
|
||||
semaphore : Optional[:class:`asyncio.Semaphore`]
|
||||
The semaphore to use for bounding tasks. If `None`, create one using ``loop`` and ``limit``.
|
||||
The semaphore to use for bounding tasks. If `None`, create one
|
||||
using ``loop`` and ``limit``.
|
||||
|
||||
Raises
|
||||
------
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user