import asyncio import io import textwrap from copy import copy from typing import Union, Optional, Dict, List, Tuple, Any, Iterator, ItemsView, Literal, cast import discord import yaml from schema import And, Or, Schema, SchemaError, Optional as UseOptional from redbot.core import checks, commands, config from redbot.core.bot import Red from redbot.core.i18n import Translator, cog_i18n from redbot.core.utils import can_user_react_in from redbot.core.utils.chat_formatting import box, error, success from redbot.core.utils.menus import start_adding_reactions from redbot.core.utils.predicates import ReactionPredicate, MessagePredicate from .converters import ( CogOrCommand, RuleType, ClearableRuleType, GuildUniqueObjectFinder, GlobalUniqueObjectFinder, ) _ = Translator("Permissions", __file__) COG = "COG" COMMAND = "COMMAND" GLOBAL = 0 _OldConfigSchema = Dict[int, Dict[str, Dict[str, Dict[str, Dict[str, List[int]]]]]] _NewConfigSchema = Dict[str, Dict[int, Dict[str, Dict[int, bool]]]] # The strings in the schema are constants and should get extracted, but not translated until # runtime. translate = _ _ = lambda s: s YAML_SCHEMA = Schema( Or( { UseOptional(COMMAND): Or( { Or(str, int): Or( { Or(int, "default"): And( bool, error=_("Rules must be either `true` or `false`.") ) }, {}, error=_("Keys under command names must be IDs (numbers) or `default`."), ) }, {}, error=_("Keys under `COMMAND` must be command names (strings)."), ), UseOptional(COG): Or( { Or(str, int): Or( { Or(int, "default"): And( bool, error=_("Rules must be either `true` or `false`.") ) }, {}, error=_("Keys under cog names must be IDs or `default`."), ) }, {}, error=_("Keys under `COG` must be cog names (strings)."), ), }, {}, error=_("Top-level keys must be either `COG` or `COMMAND`."), ) ) _ = translate __version__ = "1.0.0" @cog_i18n(_) class Permissions(commands.Cog): """Customise permissions for commands and cogs.""" # The command groups in this cog should never directly take any configuration actions # These should be delegated to specific commands so that it remains trivial # to prevent the guild owner from ever locking themselves out # see ``Permissions.__permissions_hook`` for more details def __init__(self, bot: Red): super().__init__() self.bot = bot # Config Schema: # "COG" # -> Cog names... # -> Guild IDs... # -> Model IDs... # -> True|False # -> "default" # -> True|False # "COMMAND" # -> Command names... # -> Guild IDs... # -> Model IDs... # -> True|False # -> "default" # -> True|False # Note that GLOBAL rules are denoted by an ID of 0. self.config = config.Config.get_conf(self, identifier=78631113035100160) self.config.register_global(version="") self.config.init_custom(COG, 1) self.config.register_custom(COG) self.config.init_custom(COMMAND, 1) self.config.register_custom(COMMAND) async def red_delete_data_for_user( self, *, requester: Literal["discord_deleted_user", "owner", "user", "user_strict"], user_id: int, ): if requester != "discord_deleted_user": return count = 0 _uid = str(user_id) # The dict as returned here as string keys. Above is for comparison, # there's a below recast to int where needed for guild ids for typename, getter in ((COG, self.bot.get_cog), (COMMAND, self.bot.get_command)): obj_type_rules = await self.config.custom(typename).all() count += 1 if not count % 100: await asyncio.sleep(0) for obj_name, rules_dict in obj_type_rules.items(): count += 1 if not count % 100: await asyncio.sleep(0) obj = getter(obj_name) for guild_id, guild_rules in rules_dict.items(): count += 1 if not count % 100: await asyncio.sleep(0) if _uid in guild_rules: if obj: # delegate to remove rule here await self._remove_rule( CogOrCommand(typename, obj.qualified_name, obj), user_id, int(guild_id), ) else: grp = self.config.custom(typename, obj_name) await grp.clear_raw(guild_id, user_id) async def __permissions_hook(self, ctx: commands.Context) -> Optional[bool]: """ Purpose of this hook is to prevent guild owner lockouts of permissions specifically without modifying rule behavior in any other case. Guild owner is not special cased outside of these configuration commands to allow guild owner to restrict the use of potentially damaging commands such as, but not limited to, cleanup to specific channels. Leaving the configuration commands special cased allows guild owners to fix any misconfigurations. """ if ctx.guild: if ctx.author == ctx.guild.owner: # the below should contain all commands from this cog # which configure or are useful to the # configuration of guild permissions and should never # have a potential impact on global configuration # as well as the parent groups if ctx.command in ( self.permissions, # main top level group self.permissions_acl, # acl group self.permissions_acl_getguild, self.permissions_acl_setguild, self.permissions_acl_updateguild, self.permissions_addguildrule, self.permissions_clearguildrules, self.permissions_removeguildrule, self.permissions_setdefaultguildrule, self.permissions_canrun, self.permissions_explain, ): return True # permission rules will be ignored at this case # this delegates to permissions rules, do not change to False which would deny return None @commands.group() async def permissions(self, ctx: commands.Context): """Command permission management tools.""" pass @permissions.command(name="explain") async def permissions_explain(self, ctx: commands.Context): """Explain how permissions works.""" # Apologies in advance for the translators out there... message = _( "This cog extends the default permission model of the bot. By default, many commands " "are restricted based on what the command can do.\n" "This cog allows you to refine some of those restrictions. You can allow wider or " "narrower access to most commands using it. You cannot, however, change the " "restrictions on owner-only commands.\n\n" "When additional rules are set using this cog, those rules will be checked prior to " "checking for the default restrictions of the command.\n" "Global rules (set by the owner) are checked first, then rules set for servers. If " "multiple global or server rules apply to the case, the order they are checked in is:\n" " 1. Rules about a user.\n" " 2. Rules about the voice channel a user is connected to.\n" " 3. Rules about the channel or a parent of the thread a command was issued in.\n" " 4. Rules about a role the user has (The highest role they have with a rule will be " "used).\n" " 5. Rules about the server a user is in (Global rules only).\n\n" "For more details, please read the [official documentation]" "(https://docs.discord.red/en/stable/cog_permissions.html)." ) await ctx.maybe_send_embed(message) @permissions.command(name="canrun") async def permissions_canrun( self, ctx: commands.Context, user: discord.Member, *, command: str ): """Check if a user can run a command. This will take the current context into account, such as the server and text channel. """ if not command: return await ctx.send_help() fake_message = copy(ctx.message) fake_message.author = user fake_message.content = "{}{}".format(ctx.prefix, command) com = ctx.bot.get_command(command) if com is None: out = _("No such command") else: fake_context = await ctx.bot.get_context(fake_message) try: can = await com.can_run( fake_context, check_all_parents=True, change_permission_state=False ) except commands.CommandError: can = False out = ( success(_("That user can run the specified command.")) if can else error(_("That user can not run the specified command.")) ) await ctx.send(out) @checks.guildowner_or_permissions(administrator=True) @permissions.group(name="acl", aliases=["yaml"]) async def permissions_acl(self, ctx: commands.Context): """Manage permissions with YAML files.""" @permissions_acl.command(name="yamlexample") async def permissions_acl_yaml_example(self, ctx: commands.Context): """Sends an example of the yaml layout for permissions""" await ctx.send( _("Example YAML for setting rules:\n") + box( textwrap.dedent( """\ COMMAND: ping: 12345678901234567: true 56789012345671234: false COG: General: 56789012345671234: true 12345678901234567: false default: false """ ), lang="yaml", ) ) @checks.is_owner() @permissions_acl.command(name="setglobal") async def permissions_acl_setglobal(self, ctx: commands.Context): """Set global rules with a YAML file. **WARNING**: This will override reset *all* global rules to the rules specified in the uploaded file. This does not validate the names of commands and cogs before setting the new rules. """ await self._permissions_acl_set(ctx, guild_id=GLOBAL, update=False) @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions_acl.command(name="setserver", aliases=["setguild"]) async def permissions_acl_setguild(self, ctx: commands.Context): """Set rules for this server with a YAML file. **WARNING**: This will override reset *all* rules in this server to the rules specified in the uploaded file. """ await self._permissions_acl_set(ctx, guild_id=ctx.guild.id, update=False) @checks.is_owner() @permissions_acl.command(name="getglobal") async def permissions_acl_getglobal(self, ctx: commands.Context): """Get a YAML file detailing all global rules.""" file = await self._yaml_get_acl(guild_id=GLOBAL) try: await ctx.author.send(file=file) except discord.Forbidden: await ctx.send(_("I'm not allowed to DM you.")) else: if ctx.guild is not None: await ctx.send(_("I've just sent the file to you via DM.")) finally: file.close() @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions_acl.command(name="getserver", aliases=["getguild"]) async def permissions_acl_getguild(self, ctx: commands.Context): """Get a YAML file detailing all rules in this server.""" file = await self._yaml_get_acl(guild_id=ctx.guild.id) try: await ctx.author.send(file=file) except discord.Forbidden: await ctx.send(_("I'm not allowed to DM you.")) else: await ctx.send(_("I've just sent the file to you via DM.")) finally: file.close() @checks.is_owner() @permissions_acl.command(name="updateglobal") async def permissions_acl_updateglobal(self, ctx: commands.Context): """Update global rules with a YAML file. This won't touch any rules not specified in the YAML file. """ await self._permissions_acl_set(ctx, guild_id=GLOBAL, update=True) @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions_acl.command(name="updateserver", aliases=["updateguild"]) async def permissions_acl_updateguild(self, ctx: commands.Context): """Update rules for this server with a YAML file. This won't touch any rules not specified in the YAML file. """ await self._permissions_acl_set(ctx, guild_id=ctx.guild.id, update=True) @checks.is_owner() @permissions.command(name="addglobalrule", require_var_positional=True) async def permissions_addglobalrule( self, ctx: commands.Context, allow_or_deny: RuleType, cog_or_command: CogOrCommand, *who_or_what: GlobalUniqueObjectFinder, ): """Add a global rule to a command. `` should be one of "allow" or "deny". `` is the cog or command to add the rule to. This is case sensitive. `` is one or more users, channels or roles the rule is for. """ for w in who_or_what: await self._add_rule( rule=cast(bool, allow_or_deny), cog_or_cmd=cog_or_command, model_id=w.id, guild_id=0, ) await ctx.send(_("Rule added.")) @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions.command( name="addserverrule", aliases=["addguildrule"], require_var_positional=True ) async def permissions_addguildrule( self, ctx: commands.Context, allow_or_deny: RuleType, cog_or_command: CogOrCommand, *who_or_what: GuildUniqueObjectFinder, ): """Add a rule to a command in this server. `` should be one of "allow" or "deny". `` is the cog or command to add the rule to. This is case sensitive. `` is one or more users, channels or roles the rule is for. """ for w in who_or_what: await self._add_rule( rule=cast(bool, allow_or_deny), cog_or_cmd=cog_or_command, model_id=w.id, guild_id=ctx.guild.id, ) await ctx.send(_("Rule added.")) @checks.is_owner() @permissions.command(name="removeglobalrule", require_var_positional=True) async def permissions_removeglobalrule( self, ctx: commands.Context, cog_or_command: CogOrCommand, *who_or_what: GlobalUniqueObjectFinder, ): """Remove a global rule from a command. `` is the cog or command to remove the rule from. This is case sensitive. `` is one or more users, channels or roles the rule is for. """ for w in who_or_what: await self._remove_rule(cog_or_cmd=cog_or_command, model_id=w.id, guild_id=GLOBAL) await ctx.send(_("Rule removed.")) @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions.command( name="removeserverrule", aliases=["removeguildrule"], require_var_positional=True ) async def permissions_removeguildrule( self, ctx: commands.Context, cog_or_command: CogOrCommand, *who_or_what: GlobalUniqueObjectFinder, ): """Remove a server rule from a command. `` is the cog or command to remove the rule from. This is case sensitive. `` is one or more users, channels or roles the rule is for. """ for w in who_or_what: await self._remove_rule( cog_or_cmd=cog_or_command, model_id=w.id, guild_id=ctx.guild.id ) await ctx.send(_("Rule removed.")) @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions.command(name="setdefaultserverrule", aliases=["setdefaultguildrule"]) async def permissions_setdefaultguildrule( self, ctx: commands.Context, allow_or_deny: ClearableRuleType, cog_or_command: CogOrCommand ): """Set the default rule for a command in this server. This is the rule a command will default to when no other rule is found. `` should be one of "allow", "deny" or "clear". "clear" will reset the default rule. `` is the cog or command to set the default rule for. This is case sensitive. """ await self._set_default_rule( rule=cast(Optional[bool], allow_or_deny), cog_or_cmd=cog_or_command, guild_id=ctx.guild.id, ) await ctx.send(_("Default set.")) @checks.is_owner() @permissions.command(name="setdefaultglobalrule") async def permissions_setdefaultglobalrule( self, ctx: commands.Context, allow_or_deny: ClearableRuleType, cog_or_command: CogOrCommand ): """Set the default global rule for a command. This is the rule a command will default to when no other rule is found. `` should be one of "allow", "deny" or "clear". "clear" will reset the default rule. `` is the cog or command to set the default rule for. This is case sensitive. """ await self._set_default_rule( rule=cast(Optional[bool], allow_or_deny), cog_or_cmd=cog_or_command, guild_id=GLOBAL ) await ctx.send(_("Default set.")) @checks.is_owner() @permissions.command(name="clearglobalrules") async def permissions_clearglobalrules(self, ctx: commands.Context): """Reset all global rules.""" agreed = await self._confirm(ctx) if agreed: await self._clear_rules(guild_id=GLOBAL) await ctx.tick() @commands.guild_only() @checks.guildowner_or_permissions(administrator=True) @permissions.command(name="clearserverrules", aliases=["clearguildrules"]) async def permissions_clearguildrules(self, ctx: commands.Context): """Reset all rules in this server.""" agreed = await self._confirm(ctx) if agreed: await self._clear_rules(guild_id=ctx.guild.id) await ctx.tick() @commands.Cog.listener() async def on_cog_add(self, cog: commands.Cog) -> None: """Event listener for `cog_add`. This loads rules whenever a new cog is added. """ if cog is self: # This cog has its rules loaded manually in setup() return await self._on_cog_add(cog) @commands.Cog.listener() async def on_command_add(self, command: commands.Command) -> None: """Event listener for `command_add`. This loads rules whenever a new command is added. """ if command.cog is self: # This cog's commands have their rules loaded manually in setup() return await self._on_command_add(command) async def _on_cog_add(self, cog: commands.Cog) -> None: self._load_rules_for( cog_or_command=cog, rule_dict=await self.config.custom(COG, cog.__class__.__name__).all(), ) cog.requires.ready_event.set() async def _on_command_add(self, command: commands.Command) -> None: self._load_rules_for( cog_or_command=command, rule_dict=await self.config.custom(COMMAND, command.qualified_name).all(), ) command.requires.ready_event.set() async def _add_rule( self, rule: bool, cog_or_cmd: CogOrCommand, model_id: int, guild_id: int ) -> None: """Add a rule. Guild ID should be 0 for global rules. Handles config. """ if rule is True: cog_or_cmd.obj.allow_for(model_id, guild_id=guild_id) else: cog_or_cmd.obj.deny_to(model_id, guild_id=guild_id) async with self.config.custom(cog_or_cmd.type, cog_or_cmd.name).all() as rules: rules.setdefault(str(guild_id), {})[str(model_id)] = rule async def _remove_rule(self, cog_or_cmd: CogOrCommand, model_id: int, guild_id: int) -> None: """Remove a rule. Guild ID should be 0 for global rules. Handles config. """ cog_or_cmd.obj.clear_rule_for(model_id, guild_id=guild_id) guild_id, model_id = str(guild_id), str(model_id) async with self.config.custom(cog_or_cmd.type, cog_or_cmd.name).all() as rules: if (guild_rules := rules.get(guild_id)) is not None: guild_rules.pop(model_id, None) async def _set_default_rule( self, rule: Optional[bool], cog_or_cmd: CogOrCommand, guild_id: int ) -> None: """Set the default rule. Guild ID should be 0 for the global default. Handles config. """ cog_or_cmd.obj.set_default_rule(rule, guild_id) async with self.config.custom(cog_or_cmd.type, cog_or_cmd.name).all() as rules: rules.setdefault(str(guild_id), {})["default"] = rule async def _clear_rules(self, guild_id: int) -> None: """Clear all global rules or rules for a guild. Guild ID should be 0 for global rules. Handles config. """ self.bot.clear_permission_rules(guild_id, preserve_default_rule=False) for category in (COG, COMMAND): async with self.config.custom(category).all() as all_rules: for name, rules in all_rules.items(): rules.pop(str(guild_id), None) async def _permissions_acl_set( self, ctx: commands.Context, guild_id: int, update: bool ) -> None: """Set rules from a YAML file and handle response to users too.""" if not ctx.message.attachments: await ctx.send(_("Supply a file with next message or type anything to cancel.")) try: message = await ctx.bot.wait_for( "message", check=MessagePredicate.same_context(ctx), timeout=30 ) except asyncio.TimeoutError: await ctx.send(_("You took too long to upload a file.")) return if not message.attachments: await ctx.send(_("You have cancelled the upload process.")) return parsedfile = message.attachments[0] else: parsedfile = ctx.message.attachments[0] try: await self._yaml_set_acl(parsedfile, guild_id=guild_id, update=update) except yaml.MarkedYAMLError as e: await ctx.send(_("Invalid syntax: ") + str(e)) except SchemaError as e: await ctx.send( _("Your YAML file did not match the schema: ") + translate(e.errors[-1]) ) else: await ctx.send(_("Rules set.")) async def _yaml_set_acl(self, source: discord.Attachment, guild_id: int, update: bool) -> None: """Set rules from a YAML file.""" with io.BytesIO() as fp: await source.save(fp) rules = yaml.safe_load(fp) if rules is None: rules = {} YAML_SCHEMA.validate(rules) if update is False: await self._clear_rules(guild_id) for category, getter in ((COG, self.bot.get_cog), (COMMAND, self.bot.get_command)): rules_dict = rules.get(category) if not rules_dict: continue conf = self.config.custom(category) for cmd_name, cmd_rules in rules_dict.items(): cmd_rules = {str(model_id): rule for model_id, rule in cmd_rules.items()} await conf.set_raw(cmd_name, str(guild_id), value=cmd_rules) cmd_obj = getter(str(cmd_name)) if cmd_obj is not None: self._load_rules_for(cmd_obj, {guild_id: cmd_rules}) async def _yaml_get_acl(self, guild_id: int) -> discord.File: """Get a YAML file for all rules set in a guild.""" guild_rules = {} for category in (COG, COMMAND): guild_rules.setdefault(category, {}) rules_dict = await self.config.custom(category).all() for cmd_name, cmd_rules in rules_dict.items(): model_rules = cmd_rules.get(str(guild_id)) if model_rules is not None: guild_rules[category][cmd_name] = dict(_int_key_map(model_rules.items())) fp = io.BytesIO(yaml.dump(guild_rules, default_flow_style=False).encode("utf-8")) return discord.File(fp, filename="acl.yaml") @staticmethod async def _confirm(ctx: commands.Context) -> bool: """Ask "Are you sure?" and get the response as a bool.""" if ctx.guild is None or can_user_react_in(ctx.guild.me, ctx.channel): msg = await ctx.send(_("Are you sure?")) # noinspection PyAsyncCall task = start_adding_reactions(msg, ReactionPredicate.YES_OR_NO_EMOJIS) pred = ReactionPredicate.yes_or_no(msg, ctx.author) try: await ctx.bot.wait_for("reaction_add", check=pred, timeout=30) except asyncio.TimeoutError: await ctx.send(_("Response timed out.")) return False else: task.cancel() agreed = pred.result finally: await msg.delete() else: await ctx.send(_("Are you sure?") + " (yes/no)") pred = MessagePredicate.yes_or_no(ctx) try: await ctx.bot.wait_for("message", check=pred, timeout=30) except asyncio.TimeoutError: await ctx.send(_("Response timed out.")) return False else: agreed = pred.result if agreed is False: await ctx.send(_("Action cancelled.")) return agreed async def initialize(self) -> None: """Initialize this cog. This will load all rules from config onto every currently loaded command. """ await self._maybe_update_schema() await self._load_all_rules() async def _maybe_update_schema(self) -> None: """Maybe update rules set by config prior to permissions 1.0.0.""" if await self.config.version(): return old_config = await self.config.all_guilds() old_config[GLOBAL] = await self.config.all() new_cog_rules, new_cmd_rules = self._get_updated_schema(old_config) await self.config.custom(COG).set(new_cog_rules) await self.config.custom(COMMAND).set(new_cmd_rules) await self.config.version.set(__version__) @staticmethod def _get_updated_schema( old_config: _OldConfigSchema, ) -> Tuple[_NewConfigSchema, _NewConfigSchema]: # Prior to 1.0.0, the schema was in this form for both global # and guild-based rules: # "owner_models" # -> "cogs" # -> Cog names... # -> "allow" # -> [Model IDs...] # -> "deny" # -> [Model IDs...] # -> "default" # -> "allow"|"deny" # -> "commands" # -> Command names... # -> "allow" # -> [Model IDs...] # -> "deny" # -> [Model IDs...] # -> "default" # -> "allow"|"deny" new_cog_rules = {} new_cmd_rules = {} for guild_id, old_rules in old_config.items(): if "owner_models" not in old_rules: continue old_rules = old_rules["owner_models"] for category, new_rules in zip(("cogs", "commands"), (new_cog_rules, new_cmd_rules)): if category in old_rules: for name, rules in old_rules[category].items(): these_rules = new_rules.setdefault(name, {}) guild_rules = these_rules.setdefault(str(guild_id), {}) # Since allow rules would take precedence if the same model ID # sat in both the allow and deny list, we add the deny entries # first and let any conflicting allow entries overwrite. for model_id in rules.get("deny", []): guild_rules[str(model_id)] = False for model_id in rules.get("allow", []): guild_rules[str(model_id)] = True if "default" in rules: default = rules["default"] if default == "allow": guild_rules["default"] = True elif default == "deny": guild_rules["default"] = False return new_cog_rules, new_cmd_rules async def _load_all_rules(self): """Load all of this cog's rules into loaded commands and cogs.""" for category, getter in ((COG, self.bot.get_cog), (COMMAND, self.bot.get_command)): all_rules = await self.config.custom(category).all() for name, rules in all_rules.items(): obj = getter(name) if obj is None: continue self._load_rules_for(obj, rules) @staticmethod def _load_rules_for( cog_or_command: Union[commands.Command, commands.Cog], rule_dict: Dict[Union[int, str], Dict[Union[int, str], bool]], ) -> None: """Load the rules into a command or cog object. rule_dict should be a dict mapping Guild IDs to Model IDs to rules. """ for guild_id, guild_dict in _int_key_map(rule_dict.items()): for model_id, rule in _int_key_map(guild_dict.items()): if model_id == "default": cog_or_command.set_default_rule(rule, guild_id=guild_id) elif rule is True: cog_or_command.allow_for(model_id, guild_id=guild_id) elif rule is False: cog_or_command.deny_to(model_id, guild_id=guild_id) async def cog_unload(self) -> None: await self._unload_all_rules() async def _unload_all_rules(self) -> None: """Unload all rules set by this cog. This is done instead of just clearing all rules, which could clear rules set by other cogs. """ for category, getter in ((COG, self.bot.get_cog), (COMMAND, self.bot.get_command)): all_rules = await self.config.custom(category).all() for name, rules in all_rules.items(): obj = getter(name) if obj is None: continue self._unload_rules_for(obj, rules) @staticmethod def _unload_rules_for( cog_or_command: Union[commands.Command, commands.Cog], rule_dict: Dict[Union[int, str], Dict[Union[int, str], bool]], ) -> None: """Unload the rules from a command or cog object. rule_dict should be a dict mapping Guild IDs to Model IDs to rules. """ for guild_id, guild_dict in _int_key_map(rule_dict.items()): for model_id in guild_dict.keys(): if model_id == "default": cog_or_command.set_default_rule(None, guild_id=guild_id) else: cog_or_command.clear_rule_for(int(model_id), guild_id=guild_id) def _int_key_map(items_view: ItemsView[str, Any]) -> Iterator[Tuple[Union[str, int], Any]]: for k, v in items_view: if k == "default": yield k, v else: yield int(k), v