mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
424 lines
15 KiB
Python
424 lines
15 KiB
Python
import asyncio
|
|
import importlib
|
|
import itertools
|
|
import logging
|
|
import sys
|
|
from collections import namedtuple
|
|
from random import SystemRandom
|
|
from string import ascii_letters, digits
|
|
|
|
import aiohttp
|
|
import discord
|
|
from discord.ext import commands
|
|
|
|
from redbot.core import checks
|
|
from redbot.core import i18n
|
|
|
|
import redbot.cogs # Don't remove this line or core cogs won't load
|
|
|
|
__all__ = ["find_spec", "Core"]
|
|
|
|
log = logging.getLogger("red")
|
|
|
|
OWNER_DISCLAIMER = ("⚠ **Only** the person who is hosting Red should be "
|
|
"owner. **This has SERIOUS security implications. The "
|
|
"owner can access any data that is present on the host "
|
|
"system.** ⚠")
|
|
|
|
|
|
async def find_spec(bot, cog_name: str):
|
|
try:
|
|
spec = await bot.cog_mgr.find_cog(cog_name)
|
|
except RuntimeError:
|
|
real_name = ".{}".format(cog_name)
|
|
try:
|
|
mod = importlib.import_module(real_name, package='redbot.cogs')
|
|
except ImportError:
|
|
spec = None
|
|
else:
|
|
spec = mod.__spec__
|
|
return spec
|
|
|
|
|
|
_ = i18n.CogI18n("Core", __file__)
|
|
|
|
|
|
class Core:
|
|
"""Commands related to core functions"""
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def load(self, ctx, *, cog_name: str):
|
|
"""Loads a package"""
|
|
spec = await find_spec(ctx.bot, cog_name)
|
|
if spec is None:
|
|
await ctx.send(_("No module by that name was found in any"
|
|
" cog path."))
|
|
return
|
|
|
|
try:
|
|
ctx.bot.load_extension(spec)
|
|
except Exception as e:
|
|
log.exception("Package loading failed", exc_info=e)
|
|
await ctx.send(_("Failed to load package. Check your console or "
|
|
"logs for details."))
|
|
else:
|
|
await ctx.bot.add_loaded_package(cog_name)
|
|
await ctx.send(_("Done."))
|
|
|
|
@commands.group()
|
|
@checks.is_owner()
|
|
async def unload(self, ctx, *, cog_name: str):
|
|
"""Unloads a package"""
|
|
if cog_name in ctx.bot.extensions:
|
|
ctx.bot.unload_extension(cog_name)
|
|
await ctx.bot.remove_loaded_package(cog_name)
|
|
await ctx.send(_("Done."))
|
|
else:
|
|
await ctx.send(_("That extension is not loaded."))
|
|
|
|
@commands.command(name="reload")
|
|
@checks.is_owner()
|
|
async def _reload(self, ctx, *, cog_name: str):
|
|
"""Reloads a package"""
|
|
ctx.bot.unload_extension(cog_name)
|
|
self.cleanup_and_refresh_modules(cog_name)
|
|
try:
|
|
spec = await ctx.bot.cog_mgr.find_cog(cog_name)
|
|
ctx.bot.load_extension(spec)
|
|
except Exception as e:
|
|
log.exception("Package reloading failed", exc_info=e)
|
|
await ctx.send(_("Failed to reload package. Check your console or "
|
|
"logs for details."))
|
|
else:
|
|
curr_pkgs = await ctx.bot.db.packages()
|
|
await ctx.bot.save_packages_status(curr_pkgs)
|
|
await ctx.send(_("Done."))
|
|
|
|
@commands.command(name="shutdown")
|
|
@checks.is_owner()
|
|
async def _shutdown(self, ctx, silently: bool=False):
|
|
"""Shuts down the bot"""
|
|
wave = "\N{WAVING HAND SIGN}"
|
|
skin = "\N{EMOJI MODIFIER FITZPATRICK TYPE-3}"
|
|
try: # We don't want missing perms to stop our shutdown
|
|
if not silently:
|
|
await ctx.send(_("Shutting down... ") + wave + skin)
|
|
except:
|
|
pass
|
|
await ctx.bot.shutdown()
|
|
|
|
def cleanup_and_refresh_modules(self, module_name: str):
|
|
"""Interally reloads modules so that changes are detected"""
|
|
splitted = module_name.split('.')
|
|
|
|
def maybe_reload(new_name):
|
|
try:
|
|
lib = sys.modules[new_name]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
importlib._bootstrap._exec(lib.__spec__, lib)
|
|
|
|
modules = itertools.accumulate(splitted, lambda old, next: "{}.{}".format(old, next))
|
|
for m in modules:
|
|
maybe_reload(m)
|
|
|
|
children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)}
|
|
for child_name, lib in children.items():
|
|
importlib._bootstrap._exec(lib.__spec__, lib)
|
|
|
|
@commands.group(name="set")
|
|
async def _set(self, ctx):
|
|
"""Changes Red's settings"""
|
|
if ctx.invoked_subcommand is None:
|
|
await ctx.bot.send_cmd_help(ctx)
|
|
|
|
@_set.command()
|
|
@checks.guildowner()
|
|
@commands.guild_only()
|
|
async def adminrole(self, ctx, *, role: discord.Role):
|
|
"""Sets the admin role for this server"""
|
|
await ctx.bot.db.guild(ctx.guild).admin_role.set(role.id)
|
|
await ctx.send(_("The admin role for this guild has been set."))
|
|
|
|
@_set.command()
|
|
@checks.guildowner()
|
|
@commands.guild_only()
|
|
async def modrole(self, ctx, *, role: discord.Role):
|
|
"""Sets the mod role for this server"""
|
|
await ctx.bot.db.guild(ctx.guild).mod_role.set(role.id)
|
|
await ctx.send(_("The mod role for this guild has been set."))
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
async def avatar(self, ctx, url: str):
|
|
"""Sets Red's avatar"""
|
|
session = aiohttp.ClientSession()
|
|
async with session.get(url) as r:
|
|
data = await r.read()
|
|
await session.close()
|
|
|
|
try:
|
|
await ctx.bot.user.edit(avatar=data)
|
|
except discord.HTTPException:
|
|
await ctx.send(_("Failed. Remember that you can edit my avatar "
|
|
"up to two times a hour. The URL must be a "
|
|
"direct link to a JPG / PNG."))
|
|
except discord.InvalidArgument:
|
|
await ctx.send(_("JPG / PNG format only."))
|
|
else:
|
|
await ctx.send(_("Done."))
|
|
|
|
@_set.command(name="game")
|
|
@checks.is_owner()
|
|
@commands.guild_only()
|
|
async def _game(self, ctx, *, game: str):
|
|
"""Sets Red's playing status"""
|
|
status = ctx.me.status
|
|
game = discord.Game(name=game)
|
|
await ctx.bot.change_presence(status=status, game=game)
|
|
await ctx.send(_("Game set."))
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
@commands.guild_only()
|
|
async def status(self, ctx, *, status: str):
|
|
"""Sets Red's status
|
|
Available statuses:
|
|
online
|
|
idle
|
|
dnd
|
|
invisible"""
|
|
|
|
statuses = {
|
|
"online" : discord.Status.online,
|
|
"idle" : discord.Status.idle,
|
|
"dnd" : discord.Status.dnd,
|
|
"invisible" : discord.Status.invisible
|
|
}
|
|
|
|
game = ctx.me.game
|
|
|
|
try:
|
|
status = statuses[status.lower()]
|
|
except KeyError:
|
|
await ctx.bot.send_cmd_help(ctx)
|
|
else:
|
|
await ctx.bot.change_presence(status=status,
|
|
game=game)
|
|
await ctx.send(_("Status changed to %s.") % status)
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
@commands.guild_only()
|
|
async def stream(self, ctx, streamer=None, *, stream_title=None):
|
|
"""Sets Red's streaming status
|
|
Leaving both streamer and stream_title empty will clear it."""
|
|
|
|
status = ctx.me.status
|
|
|
|
if stream_title:
|
|
stream_title = stream_title.strip()
|
|
if "twitch.tv/" not in streamer:
|
|
streamer = "https://www.twitch.tv/" + streamer
|
|
game = discord.Game(type=1, url=streamer, name=stream_title)
|
|
await ctx.bot.change_presence(game=game, status=status)
|
|
elif streamer is not None:
|
|
await ctx.bot.send_cmd_help(ctx)
|
|
return
|
|
else:
|
|
await ctx.bot.change_presence(game=None, status=status)
|
|
await ctx.send(_("Done."))
|
|
|
|
@_set.command(name="username", aliases=["name"])
|
|
@checks.is_owner()
|
|
async def _username(self, ctx, *, username: str):
|
|
"""Sets Red's username"""
|
|
try:
|
|
await ctx.bot.user.edit(username=username)
|
|
except discord.HTTPException:
|
|
await ctx.send(_("Failed to change name. Remember that you can "
|
|
"only do it up to 2 times an hour. Use "
|
|
"nicknames if you need frequent changes. "
|
|
"`{}set nickname`").format(ctx.prefix))
|
|
else:
|
|
await ctx.send(_("Done."))
|
|
|
|
@_set.command(name="nickname")
|
|
@checks.admin()
|
|
@commands.guild_only()
|
|
async def _nickname(self, ctx, *, nickname: str):
|
|
"""Sets Red's nickname"""
|
|
try:
|
|
await ctx.bot.user.edit(nick=nickname)
|
|
except discord.Forbidden:
|
|
await ctx.send(_("I lack the permissions to change my own "
|
|
"nickname."))
|
|
else:
|
|
await ctx.send("Done.")
|
|
|
|
@_set.command(aliases=["prefixes"])
|
|
@checks.is_owner()
|
|
async def prefix(self, ctx, *prefixes):
|
|
"""Sets Red's global prefix(es)"""
|
|
if not prefixes:
|
|
await ctx.bot.send_cmd_help(ctx)
|
|
return
|
|
prefixes = sorted(prefixes, reverse=True)
|
|
await ctx.bot.db.prefix.set(prefixes)
|
|
await ctx.send(_("Prefix set."))
|
|
|
|
@_set.command(aliases=["serverprefixes"])
|
|
@checks.admin()
|
|
@commands.guild_only()
|
|
async def serverprefix(self, ctx, *prefixes):
|
|
"""Sets Red's server prefix(es)"""
|
|
if not prefixes:
|
|
await ctx.bot.db.guild(ctx.guild).prefix.set([])
|
|
await ctx.send(_("Guild prefixes have been reset."))
|
|
return
|
|
prefixes = sorted(prefixes, reverse=True)
|
|
await ctx.bot.db.guild(ctx.guild).prefix.set(prefixes)
|
|
await ctx.send(_("Prefix set."))
|
|
|
|
@_set.command()
|
|
@commands.cooldown(1, 60 * 10, commands.BucketType.default)
|
|
async def owner(self, ctx):
|
|
"""Sets Red's main owner"""
|
|
def check(m):
|
|
return m.author == ctx.author and m.channel == ctx.channel
|
|
|
|
# According to the Python docs this is suitable for cryptographic use
|
|
random = SystemRandom()
|
|
length = random.randint(25, 35)
|
|
chars = ascii_letters + digits
|
|
token = ""
|
|
|
|
for i in range(length):
|
|
token += random.choice(chars)
|
|
log.info("{0} ({0.id}) requested to be set as owner."
|
|
"".format(ctx.author))
|
|
print(_("\nVerification token:"))
|
|
print(token)
|
|
|
|
await ctx.send(_("Remember:\n") + OWNER_DISCLAIMER)
|
|
await asyncio.sleep(5)
|
|
|
|
await ctx.send(_("I have printed a one-time token in the console. "
|
|
"Copy and paste it here to confirm you are the owner."))
|
|
|
|
try:
|
|
message = await ctx.bot.wait_for("message", check=check,
|
|
timeout=60)
|
|
except asyncio.TimeoutError:
|
|
self.owner.reset_cooldown(ctx)
|
|
await ctx.send(_("The set owner request has timed out."))
|
|
else:
|
|
if message.content.strip() == token:
|
|
self.owner.reset_cooldown(ctx)
|
|
await ctx.bot.db.owner.set(ctx.author.id)
|
|
ctx.bot.owner_id = ctx.author.id
|
|
await ctx.send(_("You have been set as owner."))
|
|
else:
|
|
await ctx.send(_("Invalid token."))
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
async def locale(self, ctx: commands.Context, locale_name: str):
|
|
"""
|
|
Changes bot locale.
|
|
"""
|
|
i18n.set_locale(locale_name)
|
|
|
|
await ctx.bot.db.locale.set(locale_name)
|
|
|
|
await ctx.send(_("Locale has been set."))
|
|
|
|
@commands.command()
|
|
@commands.cooldown(1, 60, commands.BucketType.user)
|
|
async def contact(self, ctx, *, message: str):
|
|
"""Sends a message to the owner"""
|
|
guild = ctx.message.guild
|
|
owner = discord.utils.get(ctx.bot.get_all_members(),
|
|
id=ctx.bot.owner_id)
|
|
author = ctx.message.author
|
|
footer = _("User ID: %s") % author.id
|
|
|
|
if ctx.guild is None:
|
|
source = _("through DM")
|
|
else:
|
|
source = _("from {}").format(guild)
|
|
footer += _(" | Server ID: %s") % guild.id
|
|
|
|
# We need to grab the DM command prefix (global)
|
|
# Since it can also be set through cli flags, bot.db is not a reliable
|
|
# source. So we'll just mock a DM message instead.
|
|
fake_message = namedtuple('Message', 'guild')
|
|
prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None))
|
|
prefix = prefixes[0]
|
|
|
|
content = _("Use `{}dm {} <text>` to reply to this user"
|
|
"").format(prefix, author.id)
|
|
|
|
if isinstance(author, discord.Member):
|
|
colour = author.colour
|
|
else:
|
|
colour = discord.Colour.red()
|
|
|
|
description = _("Sent by {} {}").format(author, source)
|
|
|
|
e = discord.Embed(colour=colour, description=message)
|
|
if author.avatar_url:
|
|
e.set_author(name=description, icon_url=author.avatar_url)
|
|
else:
|
|
e.set_author(name=description)
|
|
e.set_footer(text=footer)
|
|
|
|
try:
|
|
await owner.send(content, embed=e)
|
|
except discord.InvalidArgument:
|
|
await ctx.send(_("I cannot send your message, I'm unable to find "
|
|
"my owner... *sigh*"))
|
|
except:
|
|
await ctx.send(_("I'm unable to deliver your message. Sorry."))
|
|
else:
|
|
await ctx.send(_("Your message has been sent."))
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def dm(self, ctx, user_id: int, *, message: str):
|
|
"""Sends a DM to a user
|
|
|
|
This command needs a user id to work.
|
|
To get a user id enable 'developer mode' in Discord's
|
|
settings, 'appearance' tab. Then right click a user
|
|
and copy their id"""
|
|
destination = discord.utils.get(ctx.bot.get_all_members(),
|
|
id=user_id)
|
|
if destination is None:
|
|
await ctx.send(_("Invalid ID or user not found. You can only "
|
|
"send messages to people I share a server "
|
|
"with."))
|
|
return
|
|
|
|
e = discord.Embed(colour=discord.Colour.red(), description=message)
|
|
description = _("Owner of %s") % ctx.bot.user
|
|
fake_message = namedtuple('Message', 'guild')
|
|
prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None))
|
|
prefix = prefixes[0]
|
|
e.set_footer(text=_("You can reply to this message with %scontact"
|
|
"") % prefix)
|
|
if ctx.bot.user.avatar_url:
|
|
e.set_author(name=description, icon_url=ctx.bot.user.avatar_url)
|
|
else:
|
|
e.set_author(name=description)
|
|
|
|
try:
|
|
await destination.send(embed=e)
|
|
except:
|
|
await ctx.send(_("Sorry, I couldn't deliver your message "
|
|
"to %s") % destination)
|
|
else:
|
|
await ctx.send(_("Message delivered to %s") % destination)
|