Red-DiscordBot/cogs/owner.py
Twentysix 27bdce7f32 [Core] CLI flags, env vars login, revamped setup, in. selfbot support, etc (#513)
Startup flags: can be started and configured without ever doing the interactive setup
Changed default settings format so that all unset values are None
Removed new cogs prompt
Removed `LOGIN_TYPE` from settings.json. It now defaults to token and fallbacks to email/password
Smarter initial setup: only asks for the settings that are actually missing
For the first installation all default cogs are loaded
Startup flag that allows settings to be memory-only
Initial selfbot support
Only reset login credentials (on confirmation) instead of deleting the whole file in case of login failure
Revamped main screen
Made sure that nothing blows up when you run Red on Windows without `chcp 65001`
Possibility of setting credentials in the environment variables `RED_TOKEN` / `RED_EMAIL` `RED_PASSWORD`. They will take priority over the configuration stored on disk.
2016-12-11 00:16:20 +01:00

836 lines
30 KiB
Python

import discord
from discord.ext import commands
from cogs.utils import checks
from __main__ import set_cog
from .utils.dataIO import dataIO
from .utils.chat_formatting import pagify, box
import importlib
import traceback
import logging
import asyncio
import threading
import datetime
import glob
import os
import aiohttp
log = logging.getLogger("red.owner")
class CogNotFoundError(Exception):
pass
class CogLoadError(Exception):
pass
class NoSetupError(CogLoadError):
pass
class CogUnloadError(Exception):
pass
class OwnerUnloadWithoutReloadError(CogUnloadError):
pass
class Owner:
"""All owner-only commands that relate to debug bot operations.
"""
def __init__(self, bot):
self.bot = bot
self.setowner_lock = False
self.file_path = "data/red/disabled_commands.json"
self.disabled_commands = dataIO.load_json(self.file_path)
self.session = aiohttp.ClientSession(loop=self.bot.loop)
def __unload(self):
self.session.close()
@commands.command()
@checks.is_owner()
async def load(self, *, module: str):
"""Loads a module
Example: load mod"""
module = module.strip()
if "cogs." not in module:
module = "cogs." + module
try:
self._load_cog(module)
except CogNotFoundError:
await self.bot.say("That module could not be found.")
except CogLoadError as e:
log.exception(e)
traceback.print_exc()
await self.bot.say("There was an issue loading the module. Check"
" your console or logs for more information.\n"
"\nError: `{}`".format(e.args[0]))
except Exception as e:
log.exception(e)
traceback.print_exc()
await self.bot.say('Module was found and possibly loaded but '
'something went wrong. Check your console '
'or logs for more information.\n\n'
'Error: `{}`'.format(e.args[0]))
else:
set_cog(module, True)
await self.disable_commands()
await self.bot.say("Module enabled.")
@commands.group(invoke_without_command=True)
@checks.is_owner()
async def unload(self, *, module: str):
"""Unloads a module
Example: unload mod"""
module = module.strip()
if "cogs." not in module:
module = "cogs." + module
if not self._does_cogfile_exist(module):
await self.bot.say("That module file doesn't exist. I will not"
" turn off autoloading at start just in case"
" this isn't supposed to happen.")
else:
set_cog(module, False)
try: # No matter what we should try to unload it
self._unload_cog(module)
except OwnerUnloadWithoutReloadError:
await self.bot.say("I cannot allow you to unload the Owner plugin"
" unless you are in the process of reloading.")
except CogUnloadError as e:
log.exception(e)
traceback.print_exc()
await self.bot.say('Unable to safely disable that module.')
else:
await self.bot.say("Module disabled.")
@unload.command(name="all")
@checks.is_owner()
async def unload_all(self):
"""Unloads all modules"""
cogs = self._list_cogs()
still_loaded = []
for cog in cogs:
set_cog(cog, False)
try:
self._unload_cog(cog)
except OwnerUnloadWithoutReloadError:
pass
except CogUnloadError as e:
log.exception(e)
traceback.print_exc()
still_loaded.append(cog)
if still_loaded:
still_loaded = ", ".join(still_loaded)
await self.bot.say("I was unable to unload some cogs: "
"{}".format(still_loaded))
else:
await self.bot.say("All cogs are now unloaded.")
@checks.is_owner()
@commands.command(name="reload")
async def _reload(self, module):
"""Reloads a module
Example: reload audio"""
if "cogs." not in module:
module = "cogs." + module
try:
self._unload_cog(module, reloading=True)
except:
pass
try:
self._load_cog(module)
except CogNotFoundError:
await self.bot.say("That module cannot be found.")
except NoSetupError:
await self.bot.say("That module does not have a setup function.")
except CogLoadError as e:
log.exception(e)
traceback.print_exc()
await self.bot.say("That module could not be loaded. Check your"
" console or logs for more information.\n\n"
"Error: `{}`".format(e.args[0]))
else:
set_cog(module, True)
await self.disable_commands()
await self.bot.say("Module reloaded.")
@commands.command(name="cogs")
@checks.is_owner()
async def _show_cogs(self):
"""Shows loaded/unloaded cogs"""
# This function assumes that all cogs are in the cogs folder,
# which is currently true.
# Extracting filename from __module__ Example: cogs.owner
loaded = [c.__module__.split(".")[1] for c in self.bot.cogs.values()]
# What's in the folder but not loaded is unloaded
unloaded = [c.split(".")[1] for c in self._list_cogs()
if c.split(".")[1] not in loaded]
if not unloaded:
unloaded = ["None"]
msg = ("+ Loaded\n"
"{}\n\n"
"- Unloaded\n"
"{}"
"".format(", ".join(sorted(loaded)),
", ".join(sorted(unloaded)))
)
for page in pagify(msg, [" "], shorten_by=16):
await self.bot.say(box(page.lstrip(" "), lang="diff"))
@commands.command(pass_context=True, hidden=True)
@checks.is_owner()
async def debug(self, ctx, *, code):
"""Evaluates code"""
def check(m):
if m.content.strip().lower() == "more":
return True
author = ctx.message.author
channel = ctx.message.channel
code = code.strip('` ')
result = None
global_vars = globals().copy()
global_vars['bot'] = self.bot
global_vars['ctx'] = ctx
global_vars['message'] = ctx.message
global_vars['author'] = ctx.message.author
global_vars['channel'] = ctx.message.channel
global_vars['server'] = ctx.message.server
try:
result = eval(code, global_vars, locals())
except Exception as e:
await self.bot.say(box('{}: {}'.format(type(e).__name__, str(e)),
lang="py"))
return
if asyncio.iscoroutine(result):
result = await result
result = str(result)
if not ctx.message.channel.is_private:
censor = (self.bot.settings.email,
self.bot.settings.password,
self.bot.settings.token)
r = "[EXPUNGED]"
for w in censor:
if w is None or w == "":
continue
result = result.replace(w, r)
result = result.replace(w.lower(), r)
result = result.replace(w.upper(), r)
result = list(pagify(result, shorten_by=16))
for i, page in enumerate(result):
if i != 0 and i % 4 == 0:
last = await self.bot.say("There are still {} messages. "
"Type `more` to continue."
"".format(len(result) - (i+1)))
msg = await self.bot.wait_for_message(author=author,
channel=channel,
check=check,
timeout=10)
if msg is None:
try:
await self.bot.delete_message(last)
except:
pass
finally:
break
await self.bot.say(box(page, lang="py"))
@commands.group(name="set", pass_context=True)
async def _set(self, ctx):
"""Changes Red's global settings."""
if ctx.invoked_subcommand is None:
await self.bot.send_cmd_help(ctx)
return
@_set.command(pass_context=True)
async def owner(self, ctx):
"""Sets owner"""
if self.bot.settings.no_prompt is True:
await self.bot.say("Console interaction is disabled. Start Red "
"without the `--no-prompt` flag to use this "
"command.")
return
if self.setowner_lock:
await self.bot.say("A set owner command is already pending.")
return
if self.bot.settings.owner is not None:
await self.bot.say(
"The owner is already set. Remember that setting the owner "
"to someone else other than who hosts the bot has security "
"repercussions and is *NOT recommended*. Proceed at your own risk."
)
await asyncio.sleep(3)
await self.bot.say("Confirm in the console that you're the owner.")
self.setowner_lock = True
t = threading.Thread(target=self._wait_for_answer,
args=(ctx.message.author,))
t.start()
@_set.command(pass_context=True)
@checks.is_owner()
async def prefix(self, ctx, *prefixes):
"""Sets Red's global prefixes
Accepts multiple prefixes separated by a space. Enclose in double
quotes if a prefix contains spaces.
Example: set prefix ! $ ? "two words" """
if prefixes == ():
await self.bot.send_cmd_help(ctx)
return
self.bot.settings.prefixes = sorted(prefixes, reverse=True)
self.bot.settings.save_settings()
log.debug("Setting global prefixes to:\n\t{}"
"".format(self.bot.settings.prefixes))
p = "prefixes" if len(prefixes) > 1 else "prefix"
await self.bot.say("Global {} set".format(p))
@_set.command(pass_context=True, no_pm=True)
@checks.serverowner_or_permissions(administrator=True)
async def serverprefix(self, ctx, *prefixes):
"""Sets Red's prefixes for this server
Accepts multiple prefixes separated by a space. Enclose in double
quotes if a prefix contains spaces.
Example: set serverprefix ! $ ? "two words"
Issuing this command with no parameters will reset the server
prefixes and the global ones will be used instead."""
server = ctx.message.server
if prefixes == ():
self.bot.settings.set_server_prefixes(server, [])
self.bot.settings.save_settings()
current_p = ", ".join(self.bot.settings.prefixes)
await self.bot.say("Server prefixes reset. Current prefixes: "
"`{}`".format(current_p))
return
prefixes = sorted(prefixes, reverse=True)
self.bot.settings.set_server_prefixes(server, prefixes)
self.bot.settings.save_settings()
log.debug("Setting server's {} prefixes to:\n\t{}"
"".format(server.id, self.bot.settings.prefixes))
p = "Prefixes" if len(prefixes) > 1 else "Prefix"
await self.bot.say("{} set for this server.\n"
"To go back to the global prefixes, do"
" `{}set serverprefix` "
"".format(p, prefixes[0]))
@_set.command(pass_context=True)
@checks.is_owner()
async def name(self, ctx, *, name):
"""Sets Red's name"""
name = name.strip()
if name != "":
try:
await self.bot.edit_profile(self.bot.settings.password,
username=name)
except:
await self.bot.say("Failed to change name. Remember that you"
" can only do it up to 2 times an hour."
"Use nicknames if you need frequent "
"changes. {}set nickname"
"".format(ctx.prefix))
else:
await self.bot.say("Done.")
else:
await self.bot.send_cmd_help(ctx)
@_set.command(pass_context=True, no_pm=True)
@checks.is_owner()
async def nickname(self, ctx, *, nickname=""):
"""Sets Red's nickname
Leaving this empty will remove it."""
nickname = nickname.strip()
if nickname == "":
nickname = None
try:
await self.bot.change_nickname(ctx.message.server.me, nickname)
await self.bot.say("Done.")
except discord.Forbidden:
await self.bot.say("I cannot do that, I lack the "
"\"Change Nickname\" permission.")
@_set.command(pass_context=True)
@checks.is_owner()
async def game(self, ctx, *, game=None):
"""Sets Red's playing status
Leaving this empty will clear it."""
server = ctx.message.server
current_status = server.me.status if server is not None else None
if game:
game = game.strip()
await self.bot.change_presence(game=discord.Game(name=game),
status=current_status)
log.debug('Status set to "{}" by owner'.format(game))
else:
await self.bot.change_presence(game=None, status=current_status)
log.debug('status cleared by owner')
await self.bot.say("Done.")
@_set.command(pass_context=True)
@checks.is_owner()
async def status(self, ctx, *, status=None):
"""Sets Red's status
Statuses:
online
idle
dnd
invisible"""
statuses = {
"online" : discord.Status.online,
"idle" : discord.Status.idle,
"dnd" : discord.Status.dnd,
"invisible" : discord.Status.invisible
}
server = ctx.message.server
current_game = server.me.game if server is not None else None
if status is None:
await self.bot.change_presence(status=discord.Status.online,
game=current_game)
await self.bot.say("Status reset.")
else:
status = statuses.get(status.lower(), None)
if status:
await self.bot.change_presence(status=status,
game=current_game)
await self.bot.say("Status changed.")
else:
await self.bot.send_cmd_help(ctx)
@_set.command(pass_context=True)
@checks.is_owner()
async def stream(self, ctx, streamer=None, *, stream_title=None):
"""Sets Red's streaming status
Leaving both streamer and stream_title empty will clear it."""
server = ctx.message.server
current_status = server.me.status if server is not None else None
if stream_title:
stream_title = stream_title.strip()
if "twitch.tv/" not in streamer:
streamer = "https://www.twitch.tv/" + streamer
game = discord.Game(type=1, url=streamer, name=stream_title)
await self.bot.change_presence(game=game, status=current_status)
log.debug('Owner has set streaming status and url to "{}" and {}'.format(stream_title, streamer))
elif streamer is not None:
await self.bot.send_cmd_help(ctx)
return
else:
await self.bot.change_presence(game=None, status=current_status)
log.debug('stream cleared by owner')
await self.bot.say("Done.")
@_set.command()
@checks.is_owner()
async def avatar(self, url):
"""Sets Red's avatar"""
try:
async with self.session.get(url) as r:
data = await r.read()
await self.bot.edit_profile(self.bot.settings.password, avatar=data)
await self.bot.say("Done.")
log.debug("changed avatar")
except Exception as e:
await self.bot.say("Error, check your console or logs for "
"more information.")
log.exception(e)
traceback.print_exc()
@_set.command(name="token")
@checks.is_owner()
async def _token(self, token):
"""Sets Red's login token"""
if len(token) < 50:
await self.bot.say("Invalid token.")
else:
self.bot.settings.token = token
self.bot.settings.save_settings()
await self.bot.say("Token set. Restart me.")
log.debug("Token changed.")
@commands.command()
@checks.is_owner()
async def shutdown(self):
"""Shuts down Red"""
await self.bot.logout()
@commands.group(name="command", pass_context=True)
@checks.is_owner()
async def command_disabler(self, ctx):
"""Disables/enables commands
With no subcommands returns the disabled commands list"""
if ctx.invoked_subcommand is None:
await self.bot.send_cmd_help(ctx)
if self.disabled_commands:
msg = "Disabled commands:\n```xl\n"
for cmd in self.disabled_commands:
msg += "{}, ".format(cmd)
msg = msg.strip(", ")
await self.bot.whisper("{}```".format(msg))
@command_disabler.command()
async def disable(self, *, command):
"""Disables commands/subcommands"""
comm_obj = await self.get_command(command)
if comm_obj is KeyError:
await self.bot.say("That command doesn't seem to exist.")
elif comm_obj is False:
await self.bot.say("You cannot disable owner restricted commands.")
else:
comm_obj.enabled = False
comm_obj.hidden = True
self.disabled_commands.append(command)
dataIO.save_json(self.file_path, self.disabled_commands)
await self.bot.say("Command has been disabled.")
@command_disabler.command()
async def enable(self, *, command):
"""Enables commands/subcommands"""
if command in self.disabled_commands:
self.disabled_commands.remove(command)
dataIO.save_json(self.file_path, self.disabled_commands)
await self.bot.say("Command enabled.")
else:
await self.bot.say("That command is not disabled.")
return
try:
comm_obj = await self.get_command(command)
comm_obj.enabled = True
comm_obj.hidden = False
except: # In case it was in the disabled list but not currently loaded
pass # No point in even checking what returns
async def get_command(self, command):
command = command.split()
try:
comm_obj = self.bot.commands[command[0]]
if len(command) > 1:
command.pop(0)
for cmd in command:
comm_obj = comm_obj.commands[cmd]
except KeyError:
return KeyError
for check in comm_obj.checks:
if hasattr(check, "__name__") and check.__name__ == "is_owner_check":
return False
return comm_obj
async def disable_commands(self): # runs at boot
for cmd in self.disabled_commands:
cmd_obj = await self.get_command(cmd)
try:
cmd_obj.enabled = False
cmd_obj.hidden = True
except:
pass
@commands.command()
@checks.is_owner()
async def join(self, invite_url: discord.Invite=None):
"""Joins new server"""
if hasattr(self.bot.user, 'bot') and self.bot.user.bot is True:
# Check to ensure they're using updated discord.py
msg = ("I have a **BOT** tag, so I must be invited with an OAuth2"
" link:\nFor more information: "
"https://twentysix26.github.io/"
"Red-Docs/red_guide_bot_accounts/#bot-invites")
await self.bot.say(msg)
if hasattr(self.bot, 'oauth_url'):
await self.bot.whisper("Here's my OAUTH2 link:\n{}".format(
self.bot.oauth_url))
return
if invite_url is None:
await self.bot.say("I need a Discord Invite link for the "
"server you want me to join.")
return
try:
await self.bot.accept_invite(invite_url)
await self.bot.say("Server joined.")
log.debug("We just joined {}".format(invite_url))
except discord.NotFound:
await self.bot.say("The invite was invalid or expired.")
except discord.HTTPException:
await self.bot.say("I wasn't able to accept the invite."
" Try again.")
@commands.command(pass_context=True, no_pm=True)
@checks.is_owner()
async def leave(self, ctx):
"""Leaves server"""
message = ctx.message
await self.bot.say("Are you sure you want me to leave this server?"
" Type yes to confirm.")
response = await self.bot.wait_for_message(author=message.author)
if response.content.lower().strip() == "yes":
await self.bot.say("Alright. Bye :wave:")
log.debug('Leaving "{}"'.format(message.server.name))
await self.bot.leave_server(message.server)
else:
await self.bot.say("Ok I'll stay here then.")
@commands.command(pass_context=True)
@checks.is_owner()
async def servers(self, ctx):
"""Lists and allows to leave servers"""
owner = ctx.message.author
servers = sorted(list(self.bot.servers),
key=lambda s: s.name.lower())
msg = ""
for i, server in enumerate(servers):
msg += "{}: {}\n".format(i, server.name)
msg += "\nTo leave a server just type its number."
for page in pagify(msg, ['\n']):
await self.bot.say(page)
while msg is not None:
msg = await self.bot.wait_for_message(author=owner, timeout=15)
try:
msg = int(msg.content)
await self.leave_confirmation(servers[msg], owner, ctx)
break
except (IndexError, ValueError, AttributeError):
pass
async def leave_confirmation(self, server, owner, ctx):
await self.bot.say("Are you sure you want me "
"to leave {}? (yes/no)".format(server.name))
msg = await self.bot.wait_for_message(author=owner, timeout=15)
if msg is None:
await self.bot.say("I guess not.")
elif msg.content.lower().strip() in ("yes", "y"):
await self.bot.leave_server(server)
if server != ctx.message.server:
await self.bot.say("Done.")
else:
await self.bot.say("Alright then.")
@commands.command(pass_context=True)
async def contact(self, ctx, *, message : str):
"""Sends message to the owner"""
if self.bot.settings.owner is None:
await self.bot.say("I have no owner set.")
return
owner = discord.utils.get(self.bot.get_all_members(),
id=self.bot.settings.owner)
author = ctx.message.author
if ctx.message.channel.is_private is False:
server = ctx.message.server
source = ", server **{}** ({})".format(server.name, server.id)
else:
source = ", direct message"
sender = "From **{}** ({}){}:\n\n".format(author, author.id, source)
message = sender + message
try:
await self.bot.send_message(owner, message)
except discord.errors.InvalidArgument:
await self.bot.say("I cannot send your message, I'm unable to find"
" my owner... *sigh*")
except discord.errors.HTTPException:
await self.bot.say("Your message is too long.")
except:
await self.bot.say("I'm unable to deliver your message. Sorry.")
else:
await self.bot.say("Your message has been sent.")
@commands.command()
async def info(self):
"""Shows info about Red"""
author_repo = "https://github.com/Twentysix26"
red_repo = author_repo + "/Red-DiscordBot"
server_url = "https://discord.me/Red-DiscordBot"
dpy_repo = "https://github.com/Rapptz/discord.py"
python_url = "https://www.python.org/"
since = datetime.datetime(2016, 1, 2, 0, 0)
days_since = (datetime.datetime.now() - since).days
dpy_version = "[{}]({})".format(discord.__version__, dpy_repo)
py_version = "[{}.{}.{}]({})".format(*os.sys.version_info[:3],
python_url)
owner_set = self.bot.settings.owner is not None
owner = self.bot.settings.owner if owner_set else None
if owner:
owner = discord.utils.get(self.bot.get_all_members(), id=owner)
if not owner:
try:
owner = await self.bot.get_user_info(self.bot.settings.owner)
except:
owner = None
if not owner:
owner = "Unknown"
about = (
"This is an instance of [Red, an open source Discord bot]({}) "
"created by [Twentysix]({}) and improved by many.\n\n"
"Red is backed by a passionate community who contributes and "
"creates content for everyone to enjoy. [Join us today]({}) "
"and help us improve!\n\n"
"".format(red_repo, author_repo, server_url))
embed = discord.Embed(colour=discord.Colour.red())
embed.add_field(name="Instance owned by", value=str(owner))
embed.add_field(name="Python", value=py_version)
embed.add_field(name="discord.py", value=dpy_version)
embed.add_field(name="About Red", value=about, inline=False)
embed.set_footer(text="Bringing joy since 02 Jan 2016 (over "
"{} days ago!)".format(days_since))
try:
await self.bot.say(embed=embed)
except discord.HTTPException:
await self.bot.say("I need the `Embed links` permission "
"to send this")
@commands.command()
async def uptime(self):
"""Shows Red's uptime"""
now = datetime.datetime.now()
uptime = (now - self.bot.uptime).seconds
uptime = datetime.timedelta(seconds=uptime)
await self.bot.say("`Uptime: {}`".format(uptime))
@commands.command()
async def version(self):
"""Shows Red's current version"""
response = self.bot.loop.run_in_executor(None, self._get_version)
result = await asyncio.wait_for(response, timeout=10)
try:
await self.bot.say(embed=result)
except discord.HTTPException:
await self.bot.say("I need the `Embed links` permission "
"to send this")
def _load_cog(self, cogname):
if not self._does_cogfile_exist(cogname):
raise CogNotFoundError(cogname)
try:
mod_obj = importlib.import_module(cogname)
importlib.reload(mod_obj)
self.bot.load_extension(mod_obj.__name__)
except SyntaxError as e:
raise CogLoadError(*e.args)
except:
raise
def _unload_cog(self, cogname, reloading=False):
if not reloading and cogname == "cogs.owner":
raise OwnerUnloadWithoutReloadError(
"Can't unload the owner plugin :P")
try:
self.bot.unload_extension(cogname)
except:
raise CogUnloadError
def _list_cogs(self):
cogs = [os.path.basename(f) for f in glob.glob("cogs/*.py")]
return ["cogs." + os.path.splitext(f)[0] for f in cogs]
def _does_cogfile_exist(self, module):
if "cogs." not in module:
module = "cogs." + module
if module not in self._list_cogs():
return False
return True
def _wait_for_answer(self, author):
print(author.name + " requested to be set as owner. If this is you, "
"type 'yes'. Otherwise press enter.")
print()
print("*DO NOT* set anyone else as owner. This has security "
"repercussions.")
choice = "None"
while choice.lower() != "yes" and choice == "None":
choice = input("> ")
if choice == "yes":
self.bot.settings.owner = author.id
self.bot.settings.save_settings()
print(author.name + " has been set as owner.")
self.setowner_lock = False
self.owner.hidden = True
else:
print("The set owner request has been ignored.")
self.setowner_lock = False
def _get_version(self):
url = os.popen(r'git config --get remote.origin.url')
url = url.read().strip()[:-4]
repo_name = url.split("/")[-1]
commits = os.popen(r'git show -s -n 3 HEAD --format="%cr|%s|%H"')
ncommits = os.popen(r'git rev-list --count HEAD').read()
lines = commits.read().split('\n')
embed = discord.Embed(title="Updates of " + repo_name,
description="Last three updates",
colour=discord.Colour.red(),
url=url)
for line in lines:
if not line:
continue
when, commit, chash = line.split("|")
commit_url = url + "/commit/" + chash
content = "[{}]({}) - {} ".format(chash[:6], commit_url, commit)
embed.add_field(name=when, value=content, inline=False)
embed.set_footer(text="Total commits: " + ncommits)
return embed
def check_files():
if not os.path.isfile("data/red/disabled_commands.json"):
print("Creating empty disabled_commands.json...")
dataIO.save_json("data/red/disabled_commands.json", [])
def setup(bot):
check_files()
n = Owner(bot)
bot.add_cog(n)