mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
453 lines
19 KiB
Python
453 lines
19 KiB
Python
import asyncio
|
|
import contextlib
|
|
import platform
|
|
import sys
|
|
import codecs
|
|
import logging
|
|
import traceback
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Tuple
|
|
|
|
import aiohttp
|
|
import discord
|
|
import importlib.metadata
|
|
from packaging.requirements import Requirement
|
|
from redbot.core import data_manager
|
|
|
|
from redbot.core.commands import RedHelpFormatter, HelpSettings
|
|
from redbot.core.i18n import (
|
|
Translator,
|
|
set_contextual_locale,
|
|
set_contextual_regional_format,
|
|
set_contextual_locales_from_guild,
|
|
)
|
|
from .utils import AsyncIter
|
|
from .. import __version__ as red_version, version_info as red_version_info, VersionInfo
|
|
from . import commands
|
|
from .config import get_latest_confs
|
|
from .utils._internal_utils import (
|
|
fuzzy_command_search,
|
|
format_fuzzy_results,
|
|
expected_version,
|
|
fetch_latest_red_version_info,
|
|
send_to_owners_with_prefix_replaced,
|
|
)
|
|
from .utils.chat_formatting import inline, format_perms_list, humanize_timedelta
|
|
|
|
import rich
|
|
from rich import box
|
|
from rich.table import Table
|
|
from rich.columns import Columns
|
|
from rich.panel import Panel
|
|
from rich.text import Text
|
|
|
|
log = logging.getLogger("red")
|
|
|
|
INTRO = r"""
|
|
______ _ ______ _ _ ______ _
|
|
| ___ \ | | | _ (_) | | | ___ \ | |
|
|
| |_/ /___ __| | ______ | | | |_ ___ ___ ___ _ __ __| | | |_/ / ___ | |_
|
|
| // _ \/ _` | |______| | | | | / __|/ __/ _ \| '__/ _` | | ___ \/ _ \| __|
|
|
| |\ \ __/ (_| | | |/ /| \__ \ (_| (_) | | | (_| | | |_/ / (_) | |_
|
|
\_| \_\___|\__,_| |___/ |_|___/\___\___/|_| \__,_| \____/ \___/ \__|
|
|
"""
|
|
|
|
_ = Translator(__name__, __file__)
|
|
|
|
|
|
def get_outdated_red_messages(pypi_version: str, py_version_req: str) -> Tuple[str, str]:
|
|
outdated_red_message = _(
|
|
"Your Red instance is out of date! {} is the current version, however you are using {}!"
|
|
).format(pypi_version, red_version)
|
|
rich_outdated_message = (
|
|
f"[red]Outdated version![/red]\n"
|
|
f"[red]!!![/red]Version [cyan]{pypi_version}[/] is available, "
|
|
f"but you're using [cyan]{red_version}[/][red]!!![/red]"
|
|
)
|
|
current_python = platform.python_version()
|
|
extra_update = _(
|
|
"\n\nWhile the following command should work in most scenarios as it is "
|
|
"based on your current OS, environment, and Python version, "
|
|
"**we highly recommend you to read the update docs at <{docs}> and "
|
|
"make sure there is nothing else that "
|
|
"needs to be done during the update.**"
|
|
).format(docs="https://docs.discord.red/en/stable/update_red.html")
|
|
|
|
if not expected_version(current_python, py_version_req):
|
|
extra_update += _(
|
|
"\n\nYou have Python `{py_version}` and this update "
|
|
"requires `{req_py}`; you cannot simply run the update command.\n\n"
|
|
"You will need to follow the update instructions in our docs above, "
|
|
"if you still need help updating after following the docs go to our "
|
|
"#support channel in <https://discord.gg/red>"
|
|
).format(py_version=current_python, req_py=py_version_req)
|
|
outdated_red_message += extra_update
|
|
return outdated_red_message, rich_outdated_message
|
|
|
|
red_dist = importlib.metadata.distribution("Red-DiscordBot")
|
|
installed_extras = red_dist.metadata.get_all("Provides-Extra")
|
|
installed_extras.remove("dev")
|
|
installed_extras.remove("all")
|
|
distributions = {}
|
|
for req_str in red_dist.requires:
|
|
req = Requirement(req_str)
|
|
if req.marker is None or req.marker.evaluate():
|
|
continue
|
|
for extra in reversed(installed_extras):
|
|
if not req.marker.evaluate({"extra": extra}):
|
|
continue
|
|
|
|
# Check that the requirement is met.
|
|
# This is a bit simplified for our purposes and does not check
|
|
# whether the requirements of our requirements are met as well.
|
|
# This could potentially be an issue if we'll ever depend on
|
|
# a dependency's extra in our extra when we already depend on that
|
|
# in our base dependencies. However, considering that right now, all
|
|
# our dependencies are also fully pinned, this should not ever matter.
|
|
if req.name in distributions:
|
|
dist = distributions[req.name]
|
|
else:
|
|
try:
|
|
dist = importlib.metadata.distribution(req.name)
|
|
except importlib.metadata.PackageNotFoundError:
|
|
installed_extras.remove(extra)
|
|
dist = None
|
|
distributions[req.name] = dist
|
|
if dist is None or not req.specifier.contains(dist.version, prereleases=True):
|
|
installed_extras.remove(extra)
|
|
|
|
if installed_extras:
|
|
package_extras = f"[{','.join(installed_extras)}]"
|
|
else:
|
|
package_extras = ""
|
|
|
|
extra_update += _(
|
|
"\n\nTo update your bot, first shutdown your bot"
|
|
" then open a window of {console} (Not as admin) and run the following:"
|
|
"{command_1}\n"
|
|
"Once you've started up your bot again, we recommend that"
|
|
" you update any installed 3rd-party cogs with this command in Discord:"
|
|
"{command_2}"
|
|
).format(
|
|
console=_("Command Prompt") if platform.system() == "Windows" else _("Terminal"),
|
|
command_1=f'```"{sys.executable}" -m pip install -U "Red-DiscordBot{package_extras}"```',
|
|
command_2=f"```[p]cog update```",
|
|
)
|
|
outdated_red_message += extra_update
|
|
return outdated_red_message, rich_outdated_message
|
|
|
|
|
|
def init_events(bot, cli_flags):
|
|
@bot.event
|
|
async def on_connect():
|
|
if bot._uptime is None:
|
|
log.info("Connected to Discord. Getting ready...")
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
if bot._uptime is not None:
|
|
return
|
|
|
|
bot._uptime = datetime.utcnow()
|
|
|
|
guilds = len(bot.guilds)
|
|
users = len(set([m for m in bot.get_all_members()]))
|
|
|
|
invite_url = discord.utils.oauth_url(bot.application_id, scopes=("bot",))
|
|
|
|
prefixes = cli_flags.prefix or (await bot._config.prefix())
|
|
lang = await bot._config.locale()
|
|
dpy_version = discord.__version__
|
|
|
|
table_general_info = Table(show_edge=False, show_header=False, box=box.MINIMAL)
|
|
table_general_info.add_row("Prefixes", ", ".join(prefixes))
|
|
table_general_info.add_row("Language", lang)
|
|
table_general_info.add_row("Red version", red_version)
|
|
table_general_info.add_row("Discord.py version", dpy_version)
|
|
table_general_info.add_row("Storage type", data_manager.storage_type())
|
|
|
|
table_counts = Table(show_edge=False, show_header=False, box=box.MINIMAL)
|
|
# String conversion is needed as Rich doesn't deal with ints
|
|
table_counts.add_row("Shards", str(bot.shard_count))
|
|
table_counts.add_row("Servers", str(guilds))
|
|
if bot.intents.members: # Lets avoid 0 Unique Users
|
|
table_counts.add_row("Unique Users", str(users))
|
|
|
|
outdated_red_message = ""
|
|
rich_outdated_message = ""
|
|
with contextlib.suppress(aiohttp.ClientError, asyncio.TimeoutError):
|
|
pypi_version, py_version_req = await fetch_latest_red_version_info()
|
|
outdated = pypi_version and pypi_version > red_version_info
|
|
if outdated:
|
|
outdated_red_message, rich_outdated_message = get_outdated_red_messages(
|
|
pypi_version, py_version_req
|
|
)
|
|
|
|
rich_console = rich.get_console()
|
|
rich_console.print(INTRO, style="red", markup=False, highlight=False)
|
|
if guilds:
|
|
rich_console.print(
|
|
Columns(
|
|
[Panel(table_general_info, title=str(bot.user.name)), Panel(table_counts)],
|
|
equal=True,
|
|
align="center",
|
|
)
|
|
)
|
|
else:
|
|
rich_console.print(Columns([Panel(table_general_info, title=str(bot.user.name))]))
|
|
|
|
rich_console.print(
|
|
"Loaded {} cogs with {} commands".format(len(bot.cogs), len(bot.commands))
|
|
)
|
|
|
|
if invite_url:
|
|
rich_console.print(f"\nInvite URL: {Text(invite_url, style=f'link {invite_url}')}")
|
|
# We generally shouldn't care if the client supports it or not as Rich deals with it.
|
|
if not guilds:
|
|
rich_console.print(
|
|
f"Looking for a quick guide on setting up Red? Checkout {Text('https://start.discord.red', style='link https://start.discord.red}')}"
|
|
)
|
|
if rich_outdated_message:
|
|
rich_console.print(rich_outdated_message)
|
|
|
|
bot._red_ready.set()
|
|
if outdated_red_message:
|
|
await send_to_owners_with_prefix_replaced(bot, outdated_red_message)
|
|
|
|
@bot.event
|
|
async def on_command_completion(ctx: commands.Context):
|
|
await bot._delete_delay(ctx)
|
|
|
|
@bot.event
|
|
async def on_command_error(ctx, error, unhandled_by_cog=False):
|
|
if not unhandled_by_cog:
|
|
if hasattr(ctx.command, "on_error"):
|
|
return
|
|
|
|
if ctx.cog:
|
|
if ctx.cog.has_error_handler():
|
|
return
|
|
if not isinstance(error, commands.CommandNotFound):
|
|
asyncio.create_task(bot._delete_delay(ctx))
|
|
|
|
if isinstance(error, commands.MissingRequiredArgument):
|
|
await ctx.send_help()
|
|
elif isinstance(error, commands.ArgParserFailure):
|
|
msg = _("`{user_input}` is not a valid value for `{command}`").format(
|
|
user_input=error.user_input, command=error.cmd
|
|
)
|
|
if error.custom_help_msg:
|
|
msg += f"\n{error.custom_help_msg}"
|
|
await ctx.send(msg)
|
|
if error.send_cmd_help:
|
|
await ctx.send_help()
|
|
elif isinstance(error, commands.BadArgument):
|
|
if isinstance(error.__cause__, ValueError):
|
|
converter = ctx.current_parameter.converter
|
|
argument = ctx.current_argument
|
|
if converter is int:
|
|
await ctx.send(_('"{argument}" is not an integer.').format(argument=argument))
|
|
return
|
|
if converter is float:
|
|
await ctx.send(_('"{argument}" is not a number.').format(argument=argument))
|
|
return
|
|
if error.args:
|
|
await ctx.send(error.args[0])
|
|
else:
|
|
await ctx.send_help()
|
|
elif isinstance(error, commands.UserInputError):
|
|
await ctx.send_help()
|
|
elif isinstance(error, commands.DisabledCommand):
|
|
disabled_message = await bot._config.disabled_command_msg()
|
|
if disabled_message:
|
|
await ctx.send(disabled_message.replace("{command}", ctx.invoked_with))
|
|
elif isinstance(error, commands.CommandInvokeError):
|
|
log.exception(
|
|
"Exception in command '{}'".format(ctx.command.qualified_name),
|
|
exc_info=error.original,
|
|
)
|
|
exception_log = "Exception in command '{}'\n" "".format(ctx.command.qualified_name)
|
|
exception_log += "".join(
|
|
traceback.format_exception(type(error), error, error.__traceback__)
|
|
)
|
|
bot._last_exception = exception_log
|
|
|
|
message = await bot._config.invoke_error_msg()
|
|
if not message:
|
|
if ctx.author.id in bot.owner_ids:
|
|
message = inline(
|
|
_("Error in command '{command}'. Check your console or logs for details.")
|
|
)
|
|
else:
|
|
message = inline(_("Error in command '{command}'."))
|
|
await ctx.send(message.replace("{command}", ctx.command.qualified_name))
|
|
elif isinstance(error, commands.CommandNotFound):
|
|
help_settings = await HelpSettings.from_context(ctx)
|
|
fuzzy_commands = await fuzzy_command_search(
|
|
ctx,
|
|
commands=RedHelpFormatter.help_filter_func(
|
|
ctx, bot.walk_commands(), help_settings=help_settings
|
|
),
|
|
)
|
|
if not fuzzy_commands:
|
|
pass
|
|
elif await ctx.embed_requested():
|
|
await ctx.send(embed=await format_fuzzy_results(ctx, fuzzy_commands, embed=True))
|
|
else:
|
|
await ctx.send(await format_fuzzy_results(ctx, fuzzy_commands, embed=False))
|
|
elif isinstance(error, commands.BotMissingPermissions):
|
|
if bin(error.missing.value).count("1") == 1: # Only one perm missing
|
|
msg = _("I require the {permission} permission to execute that command.").format(
|
|
permission=format_perms_list(error.missing)
|
|
)
|
|
else:
|
|
msg = _("I require {permission_list} permissions to execute that command.").format(
|
|
permission_list=format_perms_list(error.missing)
|
|
)
|
|
await ctx.send(msg)
|
|
elif isinstance(error, commands.UserFeedbackCheckFailure):
|
|
if error.message:
|
|
await ctx.send(error.message)
|
|
elif isinstance(error, commands.NoPrivateMessage):
|
|
await ctx.send(_("That command is not available in DMs."))
|
|
elif isinstance(error, commands.PrivateMessageOnly):
|
|
await ctx.send(_("That command is only available in DMs."))
|
|
elif isinstance(error, commands.NSFWChannelRequired):
|
|
await ctx.send(_("That command is only available in NSFW channels."))
|
|
elif isinstance(error, commands.CheckFailure):
|
|
pass
|
|
elif isinstance(error, commands.CommandOnCooldown):
|
|
if bot._bypass_cooldowns and ctx.author.id in bot.owner_ids:
|
|
ctx.command.reset_cooldown(ctx)
|
|
new_ctx = await bot.get_context(ctx.message)
|
|
await bot.invoke(new_ctx)
|
|
return
|
|
relative_time = discord.utils.format_dt(
|
|
datetime.now(timezone.utc) + timedelta(seconds=error.retry_after), "R"
|
|
)
|
|
msg = _("This command is on cooldown. Try again {relative_time}.").format(
|
|
relative_time=relative_time
|
|
)
|
|
await ctx.send(msg, delete_after=error.retry_after)
|
|
elif isinstance(error, commands.MaxConcurrencyReached):
|
|
if error.per is commands.BucketType.default:
|
|
if error.number > 1:
|
|
msg = _(
|
|
"Too many people using this command."
|
|
" It can only be used {number} times concurrently."
|
|
).format(number=error.number)
|
|
else:
|
|
msg = _(
|
|
"Too many people using this command."
|
|
" It can only be used once concurrently."
|
|
)
|
|
elif error.per in (commands.BucketType.user, commands.BucketType.member):
|
|
if error.number > 1:
|
|
msg = _(
|
|
"That command is still completing,"
|
|
" it can only be used {number} times per {type} concurrently."
|
|
).format(number=error.number, type=error.per.name)
|
|
else:
|
|
msg = _(
|
|
"That command is still completing,"
|
|
" it can only be used once per {type} concurrently."
|
|
).format(type=error.per.name)
|
|
else:
|
|
if error.number > 1:
|
|
msg = _(
|
|
"Too many people using this command."
|
|
" It can only be used {number} times per {type} concurrently."
|
|
).format(number=error.number, type=error.per.name)
|
|
else:
|
|
msg = _(
|
|
"Too many people using this command."
|
|
" It can only be used once per {type} concurrently."
|
|
).format(type=error.per.name)
|
|
await ctx.send(msg)
|
|
else:
|
|
log.exception(type(error).__name__, exc_info=error)
|
|
|
|
@bot.event
|
|
async def on_message(message, /):
|
|
await set_contextual_locales_from_guild(bot, message.guild)
|
|
|
|
await bot.process_commands(message)
|
|
discord_now = message.created_at
|
|
if (
|
|
not bot._checked_time_accuracy
|
|
or (discord_now - timedelta(minutes=60)) > bot._checked_time_accuracy
|
|
):
|
|
system_now = datetime.now(timezone.utc)
|
|
diff = abs((discord_now - system_now).total_seconds())
|
|
if diff > 60:
|
|
log.warning(
|
|
"Detected significant difference (%d seconds) in system clock to discord's "
|
|
"clock. Any time sensitive code may fail.",
|
|
diff,
|
|
)
|
|
bot._checked_time_accuracy = discord_now
|
|
|
|
@bot.event
|
|
async def on_command_add(command: commands.Command):
|
|
if command.cog is not None:
|
|
return
|
|
|
|
await _disable_command_no_cog(command)
|
|
|
|
async def _guild_added(guild: discord.Guild):
|
|
disabled_commands = await bot._config.guild(guild).disabled_commands()
|
|
for command_name in disabled_commands:
|
|
command_obj = bot.get_command(command_name)
|
|
if command_obj is not None:
|
|
command_obj.disable_in(guild)
|
|
|
|
@bot.event
|
|
async def on_guild_join(guild: discord.Guild):
|
|
await _guild_added(guild)
|
|
|
|
@bot.event
|
|
async def on_guild_available(guild: discord.Guild):
|
|
# We need to check guild-disabled commands here since some cogs
|
|
# are loaded prior to `on_ready`.
|
|
await _guild_added(guild)
|
|
|
|
@bot.event
|
|
async def on_guild_remove(guild: discord.Guild):
|
|
# Clean up any unneeded checks
|
|
disabled_commands = await bot._config.guild(guild).disabled_commands()
|
|
for command_name in disabled_commands:
|
|
command_obj = bot.get_command(command_name)
|
|
if command_obj is not None:
|
|
command_obj.enable_in(guild)
|
|
|
|
@bot.event
|
|
async def on_cog_add(cog: commands.Cog):
|
|
confs = get_latest_confs()
|
|
for c in confs:
|
|
uuid = c.unique_identifier
|
|
group_data = c.custom_groups
|
|
await bot._config.custom("CUSTOM_GROUPS", c.cog_name, uuid).set(group_data)
|
|
|
|
await _disable_commands_cog(cog)
|
|
|
|
async def _disable_command(
|
|
command: commands.Command, global_disabled: list, guilds_data: dict
|
|
):
|
|
if command.qualified_name in global_disabled:
|
|
command.enabled = False
|
|
for guild_id, data in guilds_data.items():
|
|
guild_disabled_cmds = data.get("disabled_commands", [])
|
|
if command.qualified_name in guild_disabled_cmds:
|
|
command.disable_in(discord.Object(id=guild_id))
|
|
|
|
async def _disable_commands_cog(cog: commands.Cog):
|
|
global_disabled = await bot._config.disabled_commands()
|
|
guilds_data = await bot._config.all_guilds()
|
|
for command in cog.walk_commands():
|
|
await _disable_command(command, global_disabled, guilds_data)
|
|
|
|
async def _disable_command_no_cog(command: commands.Command):
|
|
global_disabled = await bot._config.disabled_commands()
|
|
guilds_data = await bot._config.all_guilds()
|
|
await _disable_command(command, global_disabled, guilds_data)
|