mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-21 18:27:59 -05:00
Rename bot.db as bot._config (#2967)
* Rename `bot.db` as `bot._config` - Continues work towards strong version guarantees - Added methods for cog use for a few things which were previously only accessible via direct access. - Retained private use in a few internal use locations, though most methods were updated away from this. - Updated documentation for shared api token users * changelog * more detail * docstring fixes * Apparently, I forgot to commit something I had locally - + a copy/paste failue in the changelog * *sigh*: * *sigh*
This commit is contained in:
@@ -6,7 +6,7 @@ from collections import Counter
|
||||
from enum import Enum
|
||||
from importlib.machinery import ModuleSpec
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union, List
|
||||
from typing import Optional, Union, List, Dict
|
||||
|
||||
import discord
|
||||
from discord.ext.commands import when_mentioned_or
|
||||
@@ -36,11 +36,11 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
|
||||
def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs):
|
||||
self._shutdown_mode = ExitCodes.CRITICAL
|
||||
self.db = Config.get_core_conf(force_registration=True)
|
||||
self._config = Config.get_core_conf(force_registration=False)
|
||||
self._co_owners = cli_flags.co_owner
|
||||
self.rpc_enabled = cli_flags.rpc
|
||||
self._last_exception = None
|
||||
self.db.register_global(
|
||||
self._config.register_global(
|
||||
token=None,
|
||||
prefix=[],
|
||||
packages=[],
|
||||
@@ -69,7 +69,7 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
schema_version=0,
|
||||
)
|
||||
|
||||
self.db.register_guild(
|
||||
self._config.register_guild(
|
||||
prefix=[],
|
||||
whitelist=[],
|
||||
blacklist=[],
|
||||
@@ -82,19 +82,19 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
autoimmune_ids=[],
|
||||
)
|
||||
|
||||
self.db.register_user(embeds=None)
|
||||
self._config.register_user(embeds=None)
|
||||
|
||||
self.db.init_custom(CUSTOM_GROUPS, 2)
|
||||
self.db.register_custom(CUSTOM_GROUPS)
|
||||
self._config.init_custom(CUSTOM_GROUPS, 2)
|
||||
self._config.register_custom(CUSTOM_GROUPS)
|
||||
|
||||
async def prefix_manager(bot, message):
|
||||
if not cli_flags.prefix:
|
||||
global_prefix = await bot.db.prefix()
|
||||
global_prefix = await bot._config.prefix()
|
||||
else:
|
||||
global_prefix = cli_flags.prefix
|
||||
if message.guild is None:
|
||||
return global_prefix
|
||||
server_prefix = await bot.db.guild(message.guild).prefix()
|
||||
server_prefix = await bot._config.guild(message.guild).prefix()
|
||||
if cli_flags.mentionable:
|
||||
return (
|
||||
when_mentioned_or(*server_prefix)(bot, message)
|
||||
@@ -117,10 +117,10 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
if "command_not_found" not in kwargs:
|
||||
kwargs["command_not_found"] = "Command {} not found.\n{}"
|
||||
|
||||
self.counter = Counter()
|
||||
self.uptime = None
|
||||
self.checked_time_accuracy = None
|
||||
self.color = discord.Embed.Empty # This is needed or color ends up 0x000000
|
||||
self._counter = Counter()
|
||||
self._uptime = None
|
||||
self._checked_time_accuracy = None
|
||||
self._color = discord.Embed.Empty # This is needed or color ends up 0x000000
|
||||
|
||||
self.main_dir = bot_dir
|
||||
|
||||
@@ -134,16 +134,42 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
|
||||
self._permissions_hooks: List[commands.CheckPredicate] = []
|
||||
|
||||
async def maybe_update_config(self):
|
||||
async def get_embed_color(self, location: discord.abc.Messageable) -> discord.Color:
|
||||
"""
|
||||
Get the embed color for a location.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
location : `discord.abc.Messageable`
|
||||
|
||||
Returns
|
||||
-------
|
||||
discord.Color
|
||||
"""
|
||||
|
||||
guild = getattr(location, "guild", None)
|
||||
|
||||
if (
|
||||
guild
|
||||
and await self._config.guild(guild).use_bot_color()
|
||||
and not isinstance(location, discord.Member)
|
||||
):
|
||||
return guild.me.color
|
||||
|
||||
return self._color
|
||||
|
||||
get_embed_colour = get_embed_color
|
||||
|
||||
async def _maybe_update_config(self):
|
||||
"""
|
||||
This should be run prior to loading cogs or connecting to discord.
|
||||
"""
|
||||
schema_version = await self.db.schema_version()
|
||||
schema_version = await self._config.schema_version()
|
||||
|
||||
if schema_version == 0:
|
||||
await self._schema_0_to_1()
|
||||
schema_version += 1
|
||||
await self.db.schema_version.set(schema_version)
|
||||
await self._config.schema_version.set(schema_version)
|
||||
|
||||
async def _schema_0_to_1(self):
|
||||
"""
|
||||
@@ -151,7 +177,7 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
"""
|
||||
|
||||
log.info("Begin updating guild configs to support multiple mod/admin roles")
|
||||
all_guild_data = await self.db.all_guilds()
|
||||
all_guild_data = await self._config.all_guilds()
|
||||
for guild_id, guild_data in all_guild_data.items():
|
||||
guild_obj = discord.Object(id=guild_id)
|
||||
mod_roles, admin_roles = [], []
|
||||
@@ -160,10 +186,10 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
|
||||
if maybe_mod_role_id:
|
||||
mod_roles.append(maybe_mod_role_id)
|
||||
await self.db.guild(guild_obj).mod_role.set(mod_roles)
|
||||
await self._config.guild(guild_obj).mod_role.set(mod_roles)
|
||||
if maybe_admin_role_id:
|
||||
admin_roles.append(maybe_admin_role_id)
|
||||
await self.db.guild(guild_obj).admin_role.set(admin_roles)
|
||||
await self._config.guild(guild_obj).admin_role.set(admin_roles)
|
||||
log.info("Done updating guild configs to support multiple mod/admin roles")
|
||||
|
||||
async def send_help_for(
|
||||
@@ -182,8 +208,8 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
:return:
|
||||
"""
|
||||
|
||||
indict["owner_id"] = await self.db.owner()
|
||||
i18n.set_locale(await self.db.locale())
|
||||
indict["owner_id"] = await self._config.owner()
|
||||
i18n.set_locale(await self._config.locale())
|
||||
|
||||
async def embed_requested(self, channel, user, command=None) -> bool:
|
||||
"""
|
||||
@@ -203,47 +229,112 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
bool
|
||||
:code:`True` if an embed is requested
|
||||
"""
|
||||
if isinstance(channel, discord.abc.PrivateChannel):
|
||||
user_setting = await self.db.user(user).embeds()
|
||||
if isinstance(channel, discord.abc.PrivateChannel) or (
|
||||
command and command == self.get_command("help")
|
||||
):
|
||||
user_setting = await self._config.user(user).embeds()
|
||||
if user_setting is not None:
|
||||
return user_setting
|
||||
else:
|
||||
guild_setting = await self.db.guild(channel.guild).embeds()
|
||||
guild_setting = await self._config.guild(channel.guild).embeds()
|
||||
if guild_setting is not None:
|
||||
return guild_setting
|
||||
global_setting = await self.db.embeds()
|
||||
global_setting = await self._config.embeds()
|
||||
return global_setting
|
||||
|
||||
async def is_owner(self, user):
|
||||
async def is_owner(self, user) -> bool:
|
||||
if user.id in self._co_owners:
|
||||
return True
|
||||
return await super().is_owner(user)
|
||||
|
||||
async def is_admin(self, member: discord.Member):
|
||||
async def is_admin(self, member: discord.Member) -> bool:
|
||||
"""Checks if a member is an admin of their guild."""
|
||||
try:
|
||||
member_snowflakes = member._roles # DEP-WARN
|
||||
for snowflake in await self.db.guild(member.guild).admin_role():
|
||||
for snowflake in await self._config.guild(member.guild).admin_role():
|
||||
if member_snowflakes.has(snowflake): # Dep-WARN
|
||||
return True
|
||||
except AttributeError: # someone passed a webhook to this
|
||||
pass
|
||||
return False
|
||||
|
||||
async def is_mod(self, member: discord.Member):
|
||||
async def is_mod(self, member: discord.Member) -> bool:
|
||||
"""Checks if a member is a mod or admin of their guild."""
|
||||
try:
|
||||
member_snowflakes = member._roles # DEP-WARN
|
||||
for snowflake in await self.db.guild(member.guild).admin_role():
|
||||
for snowflake in await self._config.guild(member.guild).admin_role():
|
||||
if member_snowflakes.has(snowflake): # DEP-WARN
|
||||
return True
|
||||
for snowflake in await self.db.guild(member.guild).mod_role():
|
||||
for snowflake in await self._config.guild(member.guild).mod_role():
|
||||
if member_snowflakes.has(snowflake): # DEP-WARN
|
||||
return True
|
||||
except AttributeError: # someone passed a webhook to this
|
||||
pass
|
||||
return False
|
||||
|
||||
async def get_admin_roles(self, guild: discord.Guild) -> List[discord.Role]:
|
||||
"""
|
||||
Gets the admin roles for a guild.
|
||||
"""
|
||||
ret: List[discord.Role] = []
|
||||
for snowflake in await self._config.guild(guild).admin_role():
|
||||
r = guild.get_role(snowflake)
|
||||
if r:
|
||||
ret.append(r)
|
||||
return ret
|
||||
|
||||
async def get_mod_roles(self, guild: discord.Guild) -> List[discord.Role]:
|
||||
"""
|
||||
Gets the mod roles for a guild.
|
||||
"""
|
||||
ret: List[discord.Role] = []
|
||||
for snowflake in await self._config.guild(guild).mod_role():
|
||||
r = guild.get_role(snowflake)
|
||||
if r:
|
||||
ret.append(r)
|
||||
return ret
|
||||
|
||||
async def get_admin_role_ids(self, guild_id: int) -> List[int]:
|
||||
"""
|
||||
Gets the admin role ids for a guild id.
|
||||
"""
|
||||
return await self._config.guild(discord.Object(id=guild_id)).admin_role()
|
||||
|
||||
async def get_mod_role_ids(self, guild_id: int) -> List[int]:
|
||||
"""
|
||||
Gets the mod role ids for a guild id.
|
||||
"""
|
||||
return await self._config.guild(discord.Object(id=guild_id)).mod_role()
|
||||
|
||||
async def get_shared_api_tokens(self, service_name: str) -> Dict[str, str]:
|
||||
"""
|
||||
Gets the shared API tokens for a service
|
||||
|
||||
Parameters
|
||||
----------
|
||||
service_name: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
A Mapping of token names to tokens.
|
||||
This mapping exists because some services have multiple tokens.
|
||||
"""
|
||||
return await self._config.api_tokens.get_raw(service_name, default={})
|
||||
|
||||
async def set_shared_api_tokens(self, service_name: str, **tokens: str):
|
||||
"""
|
||||
Sets shared API tokens for a service
|
||||
|
||||
In most cases, this should not be used. Users should instead be using the
|
||||
``set api`` command
|
||||
|
||||
This will not clear existing values not specified.
|
||||
"""
|
||||
|
||||
async with self._config.api_tokens.get_attr(service_name)() as method_abuse:
|
||||
method_abuse.update(**tokens)
|
||||
|
||||
async def get_context(self, message, *, cls=commands.Context):
|
||||
return await super().get_context(message, cls=cls)
|
||||
|
||||
@@ -269,15 +360,15 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
return os.listdir("cogs")
|
||||
|
||||
async def save_packages_status(self, packages):
|
||||
await self.db.packages.set(packages)
|
||||
await self._config.packages.set(packages)
|
||||
|
||||
async def add_loaded_package(self, pkg_name: str):
|
||||
async with self.db.packages() as curr_pkgs:
|
||||
async with self._config.packages() as curr_pkgs:
|
||||
if pkg_name not in curr_pkgs:
|
||||
curr_pkgs.append(pkg_name)
|
||||
|
||||
async def remove_loaded_package(self, pkg_name: str):
|
||||
async with self.db.packages() as curr_pkgs:
|
||||
async with self._config.packages() as curr_pkgs:
|
||||
while pkg_name in curr_pkgs:
|
||||
curr_pkgs.remove(pkg_name)
|
||||
|
||||
@@ -361,7 +452,7 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
else:
|
||||
ids_to_check.append(author.id)
|
||||
|
||||
immune_ids = await self.db.guild(guild).autoimmune_ids()
|
||||
immune_ids = await self._config.guild(guild).autoimmune_ids()
|
||||
|
||||
return any(i in immune_ids for i in ids_to_check)
|
||||
|
||||
@@ -545,14 +636,14 @@ class RedBase(commands.GroupMixin, commands.bot.BotBase, RPCMixin): # pylint: d
|
||||
Gets the users and channels to send to
|
||||
"""
|
||||
destinations = []
|
||||
opt_outs = await self.db.owner_opt_out_list()
|
||||
opt_outs = await self._config.owner_opt_out_list()
|
||||
for user_id in (self.owner_id, *self._co_owners):
|
||||
if user_id not in opt_outs:
|
||||
user = self.get_user(user_id)
|
||||
if user:
|
||||
destinations.append(user)
|
||||
|
||||
channel_ids = await self.db.extra_owner_destinations()
|
||||
channel_ids = await self._config.extra_owner_destinations()
|
||||
for channel_id in channel_ids:
|
||||
channel = self.get_channel(channel_id)
|
||||
if channel:
|
||||
|
||||
Reference in New Issue
Block a user