import asyncio import datetime import importlib import itertools import logging import sys import traceback from collections import namedtuple from pathlib import Path from random import SystemRandom from string import ascii_letters, digits from distutils.version import StrictVersion import aiohttp import discord import pkg_resources from discord.ext import commands from redbot.core import __version__ from redbot.core import checks from redbot.core import i18n from redbot.core import rpc from redbot.core.context import RedContext from .utils import TYPE_CHECKING from .utils.chat_formatting import pagify, box if TYPE_CHECKING: from redbot.core.bot import Red __all__ = ["Core"] log = logging.getLogger("red") OWNER_DISCLAIMER = ("⚠ **Only** the person who is hosting Red should be " "owner. **This has SERIOUS security implications. The " "owner can access any data that is present on the host " "system.** ⚠") _ = i18n.CogI18n("Core", __file__) class Core: """Commands related to core functions""" def __init__(self, bot): self.bot = bot # type: Red rpc.add_method('core', self.rpc_load) rpc.add_method('core', self.rpc_unload) rpc.add_method('core', self.rpc_reload) @commands.command() async def info(self, ctx: RedContext): """Shows info about Red""" author_repo = "https://github.com/Twentysix26" org_repo = "https://github.com/Cog-Creators" red_repo = org_repo + "/Red-DiscordBot" red_pypi = "https://pypi.python.org/pypi/Red-DiscordBot" support_server_url = "https://discord.gg/red" dpy_repo = "https://github.com/Rapptz/discord.py" python_url = "https://www.python.org/" since = datetime.datetime(2016, 1, 2, 0, 0) days_since = (datetime.datetime.utcnow() - since).days dpy_version = "[{}]({})".format(discord.__version__, dpy_repo) python_version = "[{}.{}.{}]({})".format( *sys.version_info[:3], python_url ) red_version = "[{}]({})".format(__version__, red_pypi) app_info = await self.bot.application_info() owner = app_info.owner async with aiohttp.ClientSession() as session: async with session.get("http://pypi.python.org/pypi/red-discordbot/json") as r: data = await r.json() outdated = StrictVersion(data["info"]["version"]) > StrictVersion(__version__) about = ( "This is an instance of [Red, an open source Discord bot]({}) " "created by [Twentysix]({}) and [improved by many]({}).\n\n" "Red is backed by a passionate community who contributes and " "creates content for everyone to enjoy. [Join us today]({}) " "and help us improve!\n\n" "".format(red_repo, author_repo, org_repo, support_server_url)) embed = discord.Embed(color=discord.Color.red()) embed.add_field(name="Instance owned by", value=str(owner)) embed.add_field(name="Python", value=python_version) embed.add_field(name="discord.py", value=dpy_version) embed.add_field(name="Red version", value=red_version) if outdated: embed.add_field(name="Outdated", value="Yes, {} is available".format( data["info"]["version"] ) ) embed.add_field(name="About Red", value=about, inline=False) embed.set_footer(text="Bringing joy since 02 Jan 2016 (over " "{} days ago!)".format(days_since)) try: await ctx.send(embed=embed) except discord.HTTPException: await ctx.send("I need the `Embed links` permission to send this") @commands.command() async def uptime(self, ctx: RedContext): """Shows Red's uptime""" since = ctx.bot.uptime.strftime("%Y-%m-%d %H:%M:%S") passed = self.get_bot_uptime() await ctx.send( "Been up for: **{}** (since {} UTC)".format( passed, since ) ) def get_bot_uptime(self, *, brief=False): # Courtesy of Danny now = datetime.datetime.utcnow() delta = now - self.bot.uptime hours, remainder = divmod(int(delta.total_seconds()), 3600) minutes, seconds = divmod(remainder, 60) days, hours = divmod(hours, 24) if not brief: if days: fmt = '{d} days, {h} hours, {m} minutes, and {s} seconds' else: fmt = '{h} hours, {m} minutes, and {s} seconds' else: fmt = '{h}h {m}m {s}s' if days: fmt = '{d}d ' + fmt return fmt.format(d=days, h=hours, m=minutes, s=seconds) @commands.command() @checks.is_owner() async def traceback(self, ctx, public: bool=False): """Sends to the owner the last command exception that has occurred If public (yes is specified), it will be sent to the chat instead""" if not public: destination = ctx.author else: destination = ctx.channel if self.bot._last_exception: for page in pagify(self.bot._last_exception): await destination.send(box(page, lang="py")) else: await ctx.send("No exception has occurred yet") @commands.command() @checks.is_owner() async def invite(self, ctx): """Show's Red's invite url""" if self.bot.user.bot: await ctx.author.send(discord.utils.oauth_url(self.bot.user.id)) else: await ctx.send("I'm not a bot account. I have no invite URL.") @commands.command() @commands.guild_only() @checks.is_owner() async def leave(self, ctx): """Leaves server""" author = ctx.author guild = ctx.guild await ctx.send("Are you sure you want me to leave this server?" " Type yes to confirm.") def conf_check(m): return m.author == author response = await self.bot.wait_for("message", check=conf_check) if response.content.lower().strip() == "yes": await ctx.send("Alright. Bye :wave:") log.debug("Leaving '{}'".format(guild.name)) await guild.leave() @commands.command() @checks.is_owner() async def servers(self, ctx): """Lists and allows to leave servers""" owner = ctx.author guilds = sorted(list(self.bot.guilds), key=lambda s: s.name.lower()) msg = "" for i, server in enumerate(guilds): msg += "{}: {}\n".format(i, server.name) msg += "\nTo leave a server, just type its number." for page in pagify(msg, ['\n']): await ctx.send(page) def msg_check(m): return m.author == owner while msg is not None: try: msg = await self.bot.wait_for("message", check=msg_check, timeout=15) except asyncio.TimeoutError: await ctx.send("I guess not.") break try: msg = int(msg.content) await self.leave_confirmation(guilds[msg], owner, ctx) break except (IndexError, ValueError, AttributeError): pass async def leave_confirmation(self, server, owner, ctx): await ctx.send("Are you sure you want me to leave {}? (yes/no)".format(server.name)) def conf_check(m): return m.author == owner try: msg = await self.bot.wait_for("message", check=conf_check, timeout=15) if msg.content.lower().strip() in ("yes", "y"): await server.leave() if server != ctx.guild: await ctx.send("Done.") else: await ctx.send("Alright then.") except asyncio.TimeoutError: await ctx.send("I guess not.") @commands.command() @checks.is_owner() async def load(self, ctx, *, cog_name: str): """Loads a package""" try: spec = await ctx.bot.cog_mgr.find_cog(cog_name) except RuntimeError: await ctx.send(_("No module by that name was found in any" " cog path.")) return try: await ctx.bot.load_extension(spec) except Exception as e: log.exception("Package loading failed", exc_info=e) exception_log = ("Exception in command '{}'\n" "".format(ctx.command.qualified_name)) exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__)) self.bot._last_exception = exception_log await ctx.send(_("Failed to load package. Check your console or " "logs for details.")) else: await ctx.bot.add_loaded_package(cog_name) await ctx.send(_("Done.")) @commands.group() @checks.is_owner() async def unload(self, ctx, *, cog_name: str): """Unloads a package""" if cog_name in ctx.bot.extensions: ctx.bot.unload_extension(cog_name) await ctx.bot.remove_loaded_package(cog_name) await ctx.send(_("Done.")) else: await ctx.send(_("That extension is not loaded.")) @commands.command(name="reload") @checks.is_owner() async def _reload(self, ctx, *, cog_name: str): """Reloads a package""" ctx.bot.unload_extension(cog_name) try: spec = await ctx.bot.cog_mgr.find_cog(cog_name) except RuntimeError: await ctx.send(_("No module by that name was found in any" " cog path.")) return try: self.cleanup_and_refresh_modules(spec.name) await ctx.bot.load_extension(spec) except Exception as e: log.exception("Package reloading failed", exc_info=e) exception_log = ("Exception in command '{}'\n" "".format(ctx.command.qualified_name)) exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__)) self.bot._last_exception = exception_log await ctx.send(_("Failed to reload package. Check your console or " "logs for details.")) else: await ctx.send(_("Done.")) @commands.command(name="shutdown") @checks.is_owner() async def _shutdown(self, ctx, silently: bool=False): """Shuts down the bot""" wave = "\N{WAVING HAND SIGN}" skin = "\N{EMOJI MODIFIER FITZPATRICK TYPE-3}" try: # We don't want missing perms to stop our shutdown if not silently: await ctx.send(_("Shutting down... ") + wave + skin) except: pass await ctx.bot.shutdown() @commands.command(name="restart") @checks.is_owner() async def _restart(self, ctx, silently: bool=False): """Attempts to restart Red Makes Red quit with exit code 26 The restart is not guaranteed: it must be dealt with by the process manager in use""" try: if not silently: await ctx.send(_("Restarting...")) except: pass await ctx.bot.shutdown(restart=True) def cleanup_and_refresh_modules(self, module_name: str): """Interally reloads modules so that changes are detected""" splitted = module_name.split('.') def maybe_reload(new_name): try: lib = sys.modules[new_name] except KeyError: pass else: importlib._bootstrap._exec(lib.__spec__, lib) modules = itertools.accumulate(splitted, "{}.{}".format) for m in modules: maybe_reload(m) children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)} for child_name, lib in children.items(): importlib._bootstrap._exec(lib.__spec__, lib) @commands.group(name="set") async def _set(self, ctx): """Changes Red's settings""" if ctx.invoked_subcommand is None: await ctx.send_help() @_set.command() @checks.guildowner() @commands.guild_only() async def adminrole(self, ctx, *, role: discord.Role): """Sets the admin role for this server""" await ctx.bot.db.guild(ctx.guild).admin_role.set(role.id) await ctx.send(_("The admin role for this guild has been set.")) @_set.command() @checks.guildowner() @commands.guild_only() async def modrole(self, ctx, *, role: discord.Role): """Sets the mod role for this server""" await ctx.bot.db.guild(ctx.guild).mod_role.set(role.id) await ctx.send(_("The mod role for this guild has been set.")) @_set.command() @checks.is_owner() async def avatar(self, ctx, url: str): """Sets Red's avatar""" async with aiohttp.ClientSession() as session: async with session.get(url) as r: data = await r.read() try: await ctx.bot.user.edit(avatar=data) except discord.HTTPException: await ctx.send(_("Failed. Remember that you can edit my avatar " "up to two times a hour. The URL must be a " "direct link to a JPG / PNG.")) except discord.InvalidArgument: await ctx.send(_("JPG / PNG format only.")) else: await ctx.send(_("Done.")) @_set.command(name="game") @checks.is_owner() async def _game(self, ctx, *, game: str=None): """Sets Red's playing status""" if game: game = discord.Game(name=game) else: game = None status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 \ else discord.Status.online for shard in ctx.bot.shards: await ctx.bot.change_presence(status=status, activity=game) await ctx.send(_("Game set.")) @_set.command(name="listening") @checks.is_owner() async def _listening(self, ctx, *, listening: str=None): """Sets Red's listening status""" status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 \ else discord.Status.online if listening: activity = discord.Activity(name=listening, type=discord.ActivityType.listening) else: activity = None for shard in ctx.bot.shards: await ctx.bot.change_presence(status=status, activity=activity) await ctx.send(_("Listening set.")) @_set.command(name="watching") @checks.is_owner() async def _watching(self, ctx, *, watching: str=None): """Sets Red's watching status""" status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 \ else discord.Status.online if watching: activity = discord.Activity(name=watching, type=discord.ActivityType.watching) else: activity = None for shard in ctx.bot.shards: await ctx.bot.change_presence(status=status, activity=activity) await ctx.send(_("Watching set.")) @_set.command() @checks.is_owner() async def status(self, ctx, *, status: str): """Sets Red's status Available statuses: online idle dnd invisible """ statuses = { "online": discord.Status.online, "idle": discord.Status.idle, "dnd": discord.Status.dnd, "invisible": discord.Status.invisible } game = ctx.bot.guilds[0].me.activity if len(ctx.bot.guilds) > 0 else None try: status = statuses[status.lower()] except KeyError: await ctx.send_help() else: for shard in ctx.bot.shards: await ctx.bot.change_presence(status=status, activity=game) await ctx.send(_("Status changed to %s.") % status) @_set.command() @checks.is_owner() async def stream(self, ctx, streamer=None, *, stream_title=None): """Sets Red's streaming status Leaving both streamer and stream_title empty will clear it.""" status = ctx.bot.guilds[0].me.status \ if len(ctx.bot.guilds) > 0 else None if stream_title: stream_title = stream_title.strip() if "twitch.tv/" not in streamer: streamer = "https://www.twitch.tv/" + streamer activity = discord.Streaming(url=streamer, name=stream_title) for shard in ctx.bot.shards: await ctx.bot.change_presence(status=status, activity=activity) elif streamer is not None: await ctx.send_help() return else: for shard in ctx.bot.shards: await ctx.bot.change_presence(activity=None, status=status) await ctx.send(_("Done.")) @_set.command(name="username", aliases=["name"]) @checks.is_owner() async def _username(self, ctx, *, username: str): """Sets Red's username""" try: await ctx.bot.user.edit(username=username) except discord.HTTPException: await ctx.send(_("Failed to change name. Remember that you can " "only do it up to 2 times an hour. Use " "nicknames if you need frequent changes. " "`{}set nickname`").format(ctx.prefix)) else: await ctx.send(_("Done.")) @_set.command(name="nickname") @checks.admin() @commands.guild_only() async def _nickname(self, ctx, *, nickname: str): """Sets Red's nickname""" try: await ctx.bot.user.edit(nick=nickname) except discord.Forbidden: await ctx.send(_("I lack the permissions to change my own " "nickname.")) else: await ctx.send("Done.") @_set.command(aliases=["prefixes"]) @checks.is_owner() async def prefix(self, ctx, *prefixes): """Sets Red's global prefix(es)""" if not prefixes: await ctx.send_help() return prefixes = sorted(prefixes, reverse=True) await ctx.bot.db.prefix.set(prefixes) await ctx.send(_("Prefix set.")) @_set.command(aliases=["serverprefixes"]) @checks.admin() @commands.guild_only() async def serverprefix(self, ctx, *prefixes): """Sets Red's server prefix(es)""" if not prefixes: await ctx.bot.db.guild(ctx.guild).prefix.set([]) await ctx.send(_("Guild prefixes have been reset.")) return prefixes = sorted(prefixes, reverse=True) await ctx.bot.db.guild(ctx.guild).prefix.set(prefixes) await ctx.send(_("Prefix set.")) @_set.command() @commands.cooldown(1, 60 * 10, commands.BucketType.default) async def owner(self, ctx): """Sets Red's main owner""" def check(m): return m.author == ctx.author and m.channel == ctx.channel # According to the Python docs this is suitable for cryptographic use random = SystemRandom() length = random.randint(25, 35) chars = ascii_letters + digits token = "" for i in range(length): token += random.choice(chars) log.info("{0} ({0.id}) requested to be set as owner." "".format(ctx.author)) print(_("\nVerification token:")) print(token) await ctx.send(_("Remember:\n") + OWNER_DISCLAIMER) await asyncio.sleep(5) await ctx.send(_("I have printed a one-time token in the console. " "Copy and paste it here to confirm you are the owner.")) try: message = await ctx.bot.wait_for("message", check=check, timeout=60) except asyncio.TimeoutError: self.owner.reset_cooldown(ctx) await ctx.send(_("The set owner request has timed out.")) else: if message.content.strip() == token: self.owner.reset_cooldown(ctx) await ctx.bot.db.owner.set(ctx.author.id) ctx.bot.owner_id = ctx.author.id await ctx.send(_("You have been set as owner.")) else: await ctx.send(_("Invalid token.")) @_set.command() @checks.is_owner() async def token(self, ctx, token: str): """Change bot token.""" if not not isinstance(ctx.channel, discord.DMChannel): try: await ctx.message.delete() except discord.Forbidden: pass await ctx.send( _("Please use that command in DM. Since users probably saw your token," " it is recommended to reset it right now. Go to the following link and" " select `Reveal Token` and `Generate a new token?`." "\n\nhttps://discordapp.com/developers/applications/me/{}").format(self.bot.user.id)) return await ctx.bot.db.token.set(token) await ctx.send("Token set. Restart me.") @_set.command() @checks.is_owner() async def locale(self, ctx: commands.Context, locale_name: str): """ Changes bot locale. Use [p]listlocales to get a list of available locales. To reset to English, use "en-US". """ i18n.set_locale(locale_name) await ctx.bot.db.locale.set(locale_name) await ctx.send(_("Locale has been set.")) @_set.command() @checks.is_owner() async def sentry(self, ctx: commands.Context, on_or_off: bool): """Enable or disable Sentry logging. Sentry is the service Red uses to manage error reporting. This should be disabled if you have made your own modifications to the redbot package. """ await ctx.bot.db.enable_sentry.set(on_or_off) if on_or_off: ctx.bot.enable_sentry() await ctx.send(_("Done. Sentry logging is now enabled.")) else: ctx.bot.disable_sentry() await ctx.send(_("Done. Sentry logging is now disabled.")) @commands.command() @checks.is_owner() async def listlocales(self, ctx: RedContext): """ Lists all available locales Use `[p]set locale` to set a locale """ async with ctx.channel.typing(): red_dist = pkg_resources.get_distribution("red-discordbot") red_path = Path(red_dist.location) / "redbot" locale_list = sorted(set([loc.stem for loc in list(red_path.glob("**/*.po"))])) pages = pagify("\n".join(locale_list)) await ctx.send_interactive( pages, box_lang="Available Locales:" ) @commands.command() @commands.cooldown(1, 60, commands.BucketType.user) async def contact(self, ctx, *, message: str): """Sends a message to the owner""" guild = ctx.message.guild owner = discord.utils.get(ctx.bot.get_all_members(), id=ctx.bot.owner_id) author = ctx.message.author footer = _("User ID: %s") % author.id if ctx.guild is None: source = _("through DM") else: source = _("from {}").format(guild) footer += _(" | Server ID: %s") % guild.id # We need to grab the DM command prefix (global) # Since it can also be set through cli flags, bot.db is not a reliable # source. So we'll just mock a DM message instead. fake_message = namedtuple('Message', 'guild') prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None)) prefix = prefixes[0] content = _("Use `{}dm {} ` to reply to this user" "").format(prefix, author.id) if isinstance(author, discord.Member): colour = author.colour else: colour = discord.Colour.red() description = _("Sent by {} {}").format(author, source) e = discord.Embed(colour=colour, description=message) if author.avatar_url: e.set_author(name=description, icon_url=author.avatar_url) else: e.set_author(name=description) e.set_footer(text=footer) try: await owner.send(content, embed=e) except discord.InvalidArgument: await ctx.send(_("I cannot send your message, I'm unable to find " "my owner... *sigh*")) except: await ctx.send(_("I'm unable to deliver your message. Sorry.")) else: await ctx.send(_("Your message has been sent.")) @commands.command() @checks.is_owner() async def dm(self, ctx, user_id: int, *, message: str): """Sends a DM to a user This command needs a user id to work. To get a user id enable 'developer mode' in Discord's settings, 'appearance' tab. Then right click a user and copy their id""" destination = discord.utils.get(ctx.bot.get_all_members(), id=user_id) if destination is None: await ctx.send(_("Invalid ID or user not found. You can only " "send messages to people I share a server " "with.")) return e = discord.Embed(colour=discord.Colour.red(), description=message) description = _("Owner of %s") % ctx.bot.user fake_message = namedtuple('Message', 'guild') prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None)) prefix = prefixes[0] e.set_footer(text=_("You can reply to this message with %scontact" "") % prefix) if ctx.bot.user.avatar_url: e.set_author(name=description, icon_url=ctx.bot.user.avatar_url) else: e.set_author(name=description) try: await destination.send(embed=e) except: await ctx.send(_("Sorry, I couldn't deliver your message " "to %s") % destination) else: await ctx.send(_("Message delivered to %s") % destination) @commands.group() @checks.is_owner() async def whitelist(self, ctx): """ Whitelist management commands. """ if ctx.invoked_subcommand is None: await ctx.send_help() @whitelist.command(name='add') async def whitelist_add(self, ctx, user: discord.User): """ Adds a user to the whitelist. """ async with ctx.bot.db.whitelist() as curr_list: if user.id not in curr_list: curr_list.append(user.id) await ctx.send(_("User added to whitelist.")) @whitelist.command(name='list') async def whitelist_list(self, ctx): """ Lists whitelisted users. """ curr_list = await ctx.bot.db.whitelist() msg = _("Whitelisted Users:") for user in curr_list: msg.append("\n\t- {}".format(user)) for page in pagify(msg): await ctx.send(box(page)) @whitelist.command(name='remove') async def whitelist_remove(self, ctx, user: discord.User): """ Removes user from whitelist. """ removed = False async with ctx.bot.db.whitelist() as curr_list: if user.id in curr_list: removed = True curr_list.remove(user.id) if removed: await ctx.send(_("User has been removed from whitelist.")) else: await ctx.send(_("User was not in the whitelist.")) @whitelist.command(name='clear') async def whitelist_clear(self, ctx): """ Clears the whitelist. """ await ctx.bot.db.whitelist.set([]) await ctx.send(_("Whitelist has been cleared.")) @commands.group() @checks.is_owner() async def blacklist(self, ctx): """ blacklist management commands. """ if ctx.invoked_subcommand is None: await ctx.send_help() @blacklist.command(name='add') async def blacklist_add(self, ctx, user: discord.User): """ Adds a user to the blacklist. """ if await ctx.bot.is_owner(user): ctx.send(_("You cannot blacklist an owner!")) return async with ctx.bot.db.blacklist() as curr_list: if user.id not in curr_list: curr_list.append(user.id) await ctx.send(_("User added to blacklist.")) @blacklist.command(name='list') async def blacklist_list(self, ctx): """ Lists blacklisted users. """ curr_list = await ctx.bot.db.blacklist() msg = _("blacklisted Users:") for user in curr_list: msg.append("\n\t- {}".format(user)) for page in pagify(msg): await ctx.send(box(page)) @blacklist.command(name='remove') async def blacklist_remove(self, ctx, user: discord.User): """ Removes user from blacklist. """ removed = False async with ctx.bot.db.blacklist() as curr_list: if user.id in curr_list: removed = True curr_list.remove(user.id) if removed: await ctx.send(_("User has been removed from blacklist.")) else: await ctx.send(_("User was not in the blacklist.")) @blacklist.command(name='clear') async def blacklist_clear(self, ctx): """ Clears the blacklist. """ await ctx.bot.db.blacklist.set([]) await ctx.send(_("blacklist has been cleared.")) # RPC handlers async def rpc_load(self, request): cog_name = request.params[0] spec = await self.bot.cog_mgr.find_cog(cog_name) if spec is None: raise LookupError("No such cog found.") self.cleanup_and_refresh_modules(spec.name) self.bot.load_extension(spec) async def rpc_unload(self, request): cog_name = request.params[0] self.bot.unload_extension(cog_name) async def rpc_reload(self, request): await self.rpc_unload(request) await self.rpc_load(request)