from typing import Tuple, Dict, Optional, List, Union from re import findall import discord from discord.ext.commands.view import StringView # DEP-WARN from redbot.core import commands, Config from redbot.core.i18n import Translator from redbot.core.utils import AsyncIter _ = Translator("Alias", __file__) class ArgParseError(Exception): pass class AliasEntry: """An object containing all required information about an alias""" name: str command: Union[Tuple[str], str] creator: int guild: Optional[int] uses: int def __init__( self, name: str, command: Union[Tuple[str], str], creator: int, guild: Optional[int] ): super().__init__() self.name = name self.command = command self.creator = creator self.guild = guild self.uses = 0 def inc(self): """ Increases the `uses` stat by 1. :return: new use count """ self.uses += 1 return self.uses def get_extra_args_from_alias(self, message: discord.Message, prefix: str) -> str: """ When an alias is executed by a user in chat this function tries to get any extra arguments passed in with the call. Whitespace will be trimmed from both ends. :param message: :param prefix: :param alias: :return: """ known_content_length = len(prefix) + len(self.name) extra = message.content[known_content_length:] view = StringView(extra) view.skip_ws() extra = [] while not view.eof: prev = view.index word = view.get_quoted_word() if len(word) < view.index - prev: word = "".join((view.buffer[prev], word, view.buffer[view.index - 1])) extra.append(word.strip(" ")) return extra def to_json(self) -> dict: return { "name": self.name, "command": self.command, "creator": self.creator, "guild": self.guild, "uses": self.uses, } @classmethod def from_json(cls, data: dict): ret = cls(data["name"], data["command"], data["creator"], data["guild"]) ret.uses = data.get("uses", 0) return ret class AliasCache: def __init__(self, config: Config, cache_enabled: bool = True): self.config = config self._cache_enabled = cache_enabled self._loaded = False self._aliases: Dict[Optional[int], Dict[str, AliasEntry]] = {None: {}} async def anonymize_aliases(self, user_id: int): async with self.config.entries() as global_aliases: for a in global_aliases: if a.get("creator", 0) == user_id: a["creator"] = 0xDE1 if self._cache_enabled: self._aliases[None][a["name"]] = AliasEntry.from_json(a) all_guilds = await self.config.all_guilds() async for guild_id, guild_data in AsyncIter(all_guilds.items(), steps=100): for a in guild_data["entries"]: if a.get("creator", 0) == user_id: break else: continue # basically, don't build a context manager without a need. async with self.config.guild_from_id(guild_id).entries() as entry_list: for a in entry_list: if a.get("creator", 0) == user_id: a["creator"] = 0xDE1 if self._cache_enabled: self._aliases[guild_id][a["name"]] = AliasEntry.from_json(a) async def load_aliases(self): if not self._cache_enabled: self._loaded = True return for alias in await self.config.entries(): self._aliases[None][alias["name"]] = AliasEntry.from_json(alias) all_guilds = await self.config.all_guilds() async for guild_id, guild_data in AsyncIter(all_guilds.items(), steps=100): if guild_id not in self._aliases: self._aliases[guild_id] = {} for alias in guild_data["entries"]: self._aliases[guild_id][alias["name"]] = AliasEntry.from_json(alias) self._loaded = True async def get_aliases(self, ctx: commands.Context) -> List[AliasEntry]: """Returns all possible aliases with the given context""" global_aliases: List[AliasEntry] = [] server_aliases: List[AliasEntry] = [] global_aliases = await self.get_global_aliases() if ctx.guild and ctx.guild.id in self._aliases: server_aliases = await self.get_guild_aliases(ctx.guild) return global_aliases + server_aliases async def get_guild_aliases(self, guild: discord.Guild) -> List[AliasEntry]: """Returns all guild specific aliases""" aliases: List[AliasEntry] = [] if self._cache_enabled: if guild.id in self._aliases: for _, alias in self._aliases[guild.id].items(): aliases.append(alias) else: aliases = [AliasEntry.from_json(d) for d in await self.config.guild(guild).entries()] return aliases async def get_global_aliases(self) -> List[AliasEntry]: """Returns all global specific aliases""" aliases: List[AliasEntry] = [] if self._cache_enabled: for _, alias in self._aliases[None].items(): aliases.append(alias) else: aliases = [AliasEntry.from_json(d) for d in await self.config.entries()] return aliases async def get_alias( self, guild: Optional[discord.Guild], alias_name: str ) -> Optional[AliasEntry]: """Returns an AliasEntry object if the provided alias_name is a registered alias""" server_aliases: List[AliasEntry] = [] if self._cache_enabled: if alias_name in self._aliases[None]: return self._aliases[None][alias_name] if guild is not None: if guild.id in self._aliases: if alias_name in self._aliases[guild.id]: return self._aliases[guild.id][alias_name] else: if guild: server_aliases = [ AliasEntry.from_json(d) for d in await self.config.guild(guild.id).entries() ] global_aliases = [AliasEntry.from_json(d) for d in await self.config.entries()] all_aliases = global_aliases + server_aliases for alias in all_aliases: if alias.name == alias_name: return alias return None @staticmethod def format_command_for_alias(command: str) -> str: # This was present in add_alias previously # Made this into a separate method so as to reuse the same code in edit_alias indices = findall(r"{(\d*)}", command) if indices: try: indices = [int(a[0]) for a in indices] except IndexError: raise ArgParseError(_("Arguments must be specified with a number.")) low = min(indices) indices = [a - low for a in indices] high = max(indices) gaps = set(indices).symmetric_difference(range(high + 1)) if gaps: raise ArgParseError( _("Arguments must be sequential. Missing arguments: ") + ", ".join(str(i + low) for i in gaps) ) command = command.format(*(f"{{{i}}}" for i in range(-low, high + low + 1))) return command async def add_alias( self, ctx: commands.Context, alias_name: str, command: str, global_: bool = False ) -> AliasEntry: command = self.format_command_for_alias(command) if global_: alias = AliasEntry(alias_name, command, ctx.author.id, None) settings = self.config if self._cache_enabled: self._aliases[None][alias.name] = alias else: alias = AliasEntry(alias_name, command, ctx.author.id, ctx.guild.id) settings = self.config.guild(ctx.guild) if self._cache_enabled: if ctx.guild.id not in self._aliases: self._aliases[ctx.guild.id] = {} self._aliases[ctx.guild.id][alias.name] = alias async with settings.entries() as curr_aliases: curr_aliases.append(alias.to_json()) return alias async def edit_alias( self, ctx: commands.Context, alias_name: str, command: str, global_: bool = False ) -> bool: command = self.format_command_for_alias(command) if global_: settings = self.config else: settings = self.config.guild(ctx.guild) async with settings.entries() as aliases: for index, alias in enumerate(aliases): if alias["name"] == alias_name: alias_edited = AliasEntry.from_json(alias) alias_edited.command = command aliases[index] = alias_edited.to_json() if self._cache_enabled: if global_: self._aliases[None][alias_edited.name] = alias_edited else: self._aliases[ctx.guild.id][alias_edited.name] = alias_edited return True return False async def delete_alias( self, ctx: commands.Context, alias_name: str, global_: bool = False ) -> bool: if global_: settings = self.config else: settings = self.config.guild(ctx.guild) async with settings.entries() as aliases: for alias in aliases: if alias["name"] == alias_name: aliases.remove(alias) if self._cache_enabled: if global_: del self._aliases[None][alias_name] else: del self._aliases[ctx.guild.id][alias_name] return True return False