mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 11:18:54 -05:00
Implemented part of #561's changes Handles non-git installations and non-default remotes Links to branch Chained commands for better peformances (theorically)
912 lines
33 KiB
Python
912 lines
33 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
from cogs.utils import checks
|
|
from __main__ import set_cog
|
|
from .utils.dataIO import dataIO
|
|
from .utils.chat_formatting import pagify, box
|
|
|
|
import importlib
|
|
import traceback
|
|
import logging
|
|
import asyncio
|
|
import threading
|
|
import datetime
|
|
import glob
|
|
import os
|
|
import aiohttp
|
|
|
|
log = logging.getLogger("red.owner")
|
|
|
|
|
|
class CogNotFoundError(Exception):
|
|
pass
|
|
|
|
|
|
class CogLoadError(Exception):
|
|
pass
|
|
|
|
|
|
class NoSetupError(CogLoadError):
|
|
pass
|
|
|
|
|
|
class CogUnloadError(Exception):
|
|
pass
|
|
|
|
|
|
class OwnerUnloadWithoutReloadError(CogUnloadError):
|
|
pass
|
|
|
|
|
|
class Owner:
|
|
"""All owner-only commands that relate to debug bot operations.
|
|
"""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.setowner_lock = False
|
|
self.file_path = "data/red/disabled_commands.json"
|
|
self.disabled_commands = dataIO.load_json(self.file_path)
|
|
self.session = aiohttp.ClientSession(loop=self.bot.loop)
|
|
|
|
def __unload(self):
|
|
self.session.close()
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def load(self, *, module: str):
|
|
"""Loads a module
|
|
|
|
Example: load mod"""
|
|
module = module.strip()
|
|
if "cogs." not in module:
|
|
module = "cogs." + module
|
|
try:
|
|
self._load_cog(module)
|
|
except CogNotFoundError:
|
|
await self.bot.say("That module could not be found.")
|
|
except CogLoadError as e:
|
|
log.exception(e)
|
|
traceback.print_exc()
|
|
await self.bot.say("There was an issue loading the module. Check"
|
|
" your console or logs for more information.\n"
|
|
"\nError: `{}`".format(e.args[0]))
|
|
except Exception as e:
|
|
log.exception(e)
|
|
traceback.print_exc()
|
|
await self.bot.say('Module was found and possibly loaded but '
|
|
'something went wrong. Check your console '
|
|
'or logs for more information.\n\n'
|
|
'Error: `{}`'.format(e.args[0]))
|
|
else:
|
|
set_cog(module, True)
|
|
await self.disable_commands()
|
|
await self.bot.say("Module enabled.")
|
|
|
|
@commands.group(invoke_without_command=True)
|
|
@checks.is_owner()
|
|
async def unload(self, *, module: str):
|
|
"""Unloads a module
|
|
|
|
Example: unload mod"""
|
|
module = module.strip()
|
|
if "cogs." not in module:
|
|
module = "cogs." + module
|
|
if not self._does_cogfile_exist(module):
|
|
await self.bot.say("That module file doesn't exist. I will not"
|
|
" turn off autoloading at start just in case"
|
|
" this isn't supposed to happen.")
|
|
else:
|
|
set_cog(module, False)
|
|
try: # No matter what we should try to unload it
|
|
self._unload_cog(module)
|
|
except OwnerUnloadWithoutReloadError:
|
|
await self.bot.say("I cannot allow you to unload the Owner plugin"
|
|
" unless you are in the process of reloading.")
|
|
except CogUnloadError as e:
|
|
log.exception(e)
|
|
traceback.print_exc()
|
|
await self.bot.say('Unable to safely disable that module.')
|
|
else:
|
|
await self.bot.say("Module disabled.")
|
|
|
|
@unload.command(name="all")
|
|
@checks.is_owner()
|
|
async def unload_all(self):
|
|
"""Unloads all modules"""
|
|
cogs = self._list_cogs()
|
|
still_loaded = []
|
|
for cog in cogs:
|
|
set_cog(cog, False)
|
|
try:
|
|
self._unload_cog(cog)
|
|
except OwnerUnloadWithoutReloadError:
|
|
pass
|
|
except CogUnloadError as e:
|
|
log.exception(e)
|
|
traceback.print_exc()
|
|
still_loaded.append(cog)
|
|
if still_loaded:
|
|
still_loaded = ", ".join(still_loaded)
|
|
await self.bot.say("I was unable to unload some cogs: "
|
|
"{}".format(still_loaded))
|
|
else:
|
|
await self.bot.say("All cogs are now unloaded.")
|
|
|
|
@checks.is_owner()
|
|
@commands.command(name="reload")
|
|
async def _reload(self, module):
|
|
"""Reloads a module
|
|
|
|
Example: reload audio"""
|
|
if "cogs." not in module:
|
|
module = "cogs." + module
|
|
|
|
try:
|
|
self._unload_cog(module, reloading=True)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
self._load_cog(module)
|
|
except CogNotFoundError:
|
|
await self.bot.say("That module cannot be found.")
|
|
except NoSetupError:
|
|
await self.bot.say("That module does not have a setup function.")
|
|
except CogLoadError as e:
|
|
log.exception(e)
|
|
traceback.print_exc()
|
|
await self.bot.say("That module could not be loaded. Check your"
|
|
" console or logs for more information.\n\n"
|
|
"Error: `{}`".format(e.args[0]))
|
|
else:
|
|
set_cog(module, True)
|
|
await self.disable_commands()
|
|
await self.bot.say("Module reloaded.")
|
|
|
|
@commands.command(name="cogs")
|
|
@checks.is_owner()
|
|
async def _show_cogs(self):
|
|
"""Shows loaded/unloaded cogs"""
|
|
# This function assumes that all cogs are in the cogs folder,
|
|
# which is currently true.
|
|
|
|
# Extracting filename from __module__ Example: cogs.owner
|
|
loaded = [c.__module__.split(".")[1] for c in self.bot.cogs.values()]
|
|
# What's in the folder but not loaded is unloaded
|
|
unloaded = [c.split(".")[1] for c in self._list_cogs()
|
|
if c.split(".")[1] not in loaded]
|
|
|
|
if not unloaded:
|
|
unloaded = ["None"]
|
|
|
|
msg = ("+ Loaded\n"
|
|
"{}\n\n"
|
|
"- Unloaded\n"
|
|
"{}"
|
|
"".format(", ".join(sorted(loaded)),
|
|
", ".join(sorted(unloaded)))
|
|
)
|
|
for page in pagify(msg, [" "], shorten_by=16):
|
|
await self.bot.say(box(page.lstrip(" "), lang="diff"))
|
|
|
|
@commands.command(pass_context=True, hidden=True)
|
|
@checks.is_owner()
|
|
async def debug(self, ctx, *, code):
|
|
"""Evaluates code"""
|
|
def check(m):
|
|
if m.content.strip().lower() == "more":
|
|
return True
|
|
|
|
author = ctx.message.author
|
|
channel = ctx.message.channel
|
|
|
|
code = code.strip('` ')
|
|
result = None
|
|
|
|
global_vars = globals().copy()
|
|
global_vars['bot'] = self.bot
|
|
global_vars['ctx'] = ctx
|
|
global_vars['message'] = ctx.message
|
|
global_vars['author'] = ctx.message.author
|
|
global_vars['channel'] = ctx.message.channel
|
|
global_vars['server'] = ctx.message.server
|
|
|
|
try:
|
|
result = eval(code, global_vars, locals())
|
|
except Exception as e:
|
|
await self.bot.say(box('{}: {}'.format(type(e).__name__, str(e)),
|
|
lang="py"))
|
|
return
|
|
|
|
if asyncio.iscoroutine(result):
|
|
result = await result
|
|
|
|
result = str(result)
|
|
|
|
if not ctx.message.channel.is_private:
|
|
censor = (self.bot.settings.email,
|
|
self.bot.settings.password,
|
|
self.bot.settings.token)
|
|
r = "[EXPUNGED]"
|
|
for w in censor:
|
|
if w is None or w == "":
|
|
continue
|
|
result = result.replace(w, r)
|
|
result = result.replace(w.lower(), r)
|
|
result = result.replace(w.upper(), r)
|
|
|
|
result = list(pagify(result, shorten_by=16))
|
|
|
|
for i, page in enumerate(result):
|
|
if i != 0 and i % 4 == 0:
|
|
last = await self.bot.say("There are still {} messages. "
|
|
"Type `more` to continue."
|
|
"".format(len(result) - (i+1)))
|
|
msg = await self.bot.wait_for_message(author=author,
|
|
channel=channel,
|
|
check=check,
|
|
timeout=10)
|
|
if msg is None:
|
|
try:
|
|
await self.bot.delete_message(last)
|
|
except:
|
|
pass
|
|
finally:
|
|
break
|
|
await self.bot.say(box(page, lang="py"))
|
|
|
|
@commands.group(name="set", pass_context=True)
|
|
async def _set(self, ctx):
|
|
"""Changes Red's global settings."""
|
|
if ctx.invoked_subcommand is None:
|
|
await self.bot.send_cmd_help(ctx)
|
|
return
|
|
|
|
@_set.command(pass_context=True)
|
|
async def owner(self, ctx):
|
|
"""Sets owner"""
|
|
if self.bot.settings.no_prompt is True:
|
|
await self.bot.say("Console interaction is disabled. Start Red "
|
|
"without the `--no-prompt` flag to use this "
|
|
"command.")
|
|
return
|
|
if self.setowner_lock:
|
|
await self.bot.say("A set owner command is already pending.")
|
|
return
|
|
|
|
if self.bot.settings.owner is not None:
|
|
await self.bot.say(
|
|
"The owner is already set. Remember that setting the owner "
|
|
"to someone else other than who hosts the bot has security "
|
|
"repercussions and is *NOT recommended*. Proceed at your own risk."
|
|
)
|
|
await asyncio.sleep(3)
|
|
|
|
await self.bot.say("Confirm in the console that you're the owner.")
|
|
self.setowner_lock = True
|
|
t = threading.Thread(target=self._wait_for_answer,
|
|
args=(ctx.message.author,))
|
|
t.start()
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
async def defaultmodrole(self, *, role_name: str):
|
|
"""Sets the default mod role name
|
|
|
|
This is used if a server-specific role is not set"""
|
|
self.bot.settings.default_mod = role_name
|
|
self.bot.settings.save_settings()
|
|
await self.bot.say("The default mod role name has been set.")
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
async def defaultadminrole(self, *, role_name: str):
|
|
"""Sets the default admin role name
|
|
|
|
This is used if a server-specific role is not set"""
|
|
self.bot.settings.default_admin = role_name
|
|
self.bot.settings.save_settings()
|
|
await self.bot.say("The default admin role name has been set.")
|
|
|
|
@_set.command(pass_context=True)
|
|
@checks.is_owner()
|
|
async def prefix(self, ctx, *prefixes):
|
|
"""Sets Red's global prefixes
|
|
|
|
Accepts multiple prefixes separated by a space. Enclose in double
|
|
quotes if a prefix contains spaces.
|
|
Example: set prefix ! $ ? "two words" """
|
|
if prefixes == ():
|
|
await self.bot.send_cmd_help(ctx)
|
|
return
|
|
|
|
self.bot.settings.prefixes = sorted(prefixes, reverse=True)
|
|
self.bot.settings.save_settings()
|
|
log.debug("Setting global prefixes to:\n\t{}"
|
|
"".format(self.bot.settings.prefixes))
|
|
|
|
p = "prefixes" if len(prefixes) > 1 else "prefix"
|
|
await self.bot.say("Global {} set".format(p))
|
|
|
|
@_set.command(pass_context=True, no_pm=True)
|
|
@checks.serverowner_or_permissions(administrator=True)
|
|
async def serverprefix(self, ctx, *prefixes):
|
|
"""Sets Red's prefixes for this server
|
|
|
|
Accepts multiple prefixes separated by a space. Enclose in double
|
|
quotes if a prefix contains spaces.
|
|
Example: set serverprefix ! $ ? "two words"
|
|
|
|
Issuing this command with no parameters will reset the server
|
|
prefixes and the global ones will be used instead."""
|
|
server = ctx.message.server
|
|
|
|
if prefixes == ():
|
|
self.bot.settings.set_server_prefixes(server, [])
|
|
self.bot.settings.save_settings()
|
|
current_p = ", ".join(self.bot.settings.prefixes)
|
|
await self.bot.say("Server prefixes reset. Current prefixes: "
|
|
"`{}`".format(current_p))
|
|
return
|
|
|
|
prefixes = sorted(prefixes, reverse=True)
|
|
self.bot.settings.set_server_prefixes(server, prefixes)
|
|
self.bot.settings.save_settings()
|
|
log.debug("Setting server's {} prefixes to:\n\t{}"
|
|
"".format(server.id, self.bot.settings.prefixes))
|
|
|
|
p = "Prefixes" if len(prefixes) > 1 else "Prefix"
|
|
await self.bot.say("{} set for this server.\n"
|
|
"To go back to the global prefixes, do"
|
|
" `{}set serverprefix` "
|
|
"".format(p, prefixes[0]))
|
|
|
|
@_set.command(pass_context=True)
|
|
@checks.is_owner()
|
|
async def name(self, ctx, *, name):
|
|
"""Sets Red's name"""
|
|
name = name.strip()
|
|
if name != "":
|
|
try:
|
|
await self.bot.edit_profile(self.bot.settings.password,
|
|
username=name)
|
|
except:
|
|
await self.bot.say("Failed to change name. Remember that you"
|
|
" can only do it up to 2 times an hour."
|
|
"Use nicknames if you need frequent "
|
|
"changes. {}set nickname"
|
|
"".format(ctx.prefix))
|
|
else:
|
|
await self.bot.say("Done.")
|
|
else:
|
|
await self.bot.send_cmd_help(ctx)
|
|
|
|
@_set.command(pass_context=True, no_pm=True)
|
|
@checks.is_owner()
|
|
async def nickname(self, ctx, *, nickname=""):
|
|
"""Sets Red's nickname
|
|
|
|
Leaving this empty will remove it."""
|
|
nickname = nickname.strip()
|
|
if nickname == "":
|
|
nickname = None
|
|
try:
|
|
await self.bot.change_nickname(ctx.message.server.me, nickname)
|
|
await self.bot.say("Done.")
|
|
except discord.Forbidden:
|
|
await self.bot.say("I cannot do that, I lack the "
|
|
"\"Change Nickname\" permission.")
|
|
|
|
@_set.command(pass_context=True)
|
|
@checks.is_owner()
|
|
async def game(self, ctx, *, game=None):
|
|
"""Sets Red's playing status
|
|
|
|
Leaving this empty will clear it."""
|
|
|
|
server = ctx.message.server
|
|
|
|
current_status = server.me.status if server is not None else None
|
|
|
|
if game:
|
|
game = game.strip()
|
|
await self.bot.change_presence(game=discord.Game(name=game),
|
|
status=current_status)
|
|
log.debug('Status set to "{}" by owner'.format(game))
|
|
else:
|
|
await self.bot.change_presence(game=None, status=current_status)
|
|
log.debug('status cleared by owner')
|
|
await self.bot.say("Done.")
|
|
|
|
@_set.command(pass_context=True)
|
|
@checks.is_owner()
|
|
async def status(self, ctx, *, status=None):
|
|
"""Sets Red's status
|
|
|
|
Statuses:
|
|
online
|
|
idle
|
|
dnd
|
|
invisible"""
|
|
|
|
statuses = {
|
|
"online" : discord.Status.online,
|
|
"idle" : discord.Status.idle,
|
|
"dnd" : discord.Status.dnd,
|
|
"invisible" : discord.Status.invisible
|
|
}
|
|
|
|
server = ctx.message.server
|
|
|
|
current_game = server.me.game if server is not None else None
|
|
|
|
if status is None:
|
|
await self.bot.change_presence(status=discord.Status.online,
|
|
game=current_game)
|
|
await self.bot.say("Status reset.")
|
|
else:
|
|
status = statuses.get(status.lower(), None)
|
|
if status:
|
|
await self.bot.change_presence(status=status,
|
|
game=current_game)
|
|
await self.bot.say("Status changed.")
|
|
else:
|
|
await self.bot.send_cmd_help(ctx)
|
|
|
|
@_set.command(pass_context=True)
|
|
@checks.is_owner()
|
|
async def stream(self, ctx, streamer=None, *, stream_title=None):
|
|
"""Sets Red's streaming status
|
|
|
|
Leaving both streamer and stream_title empty will clear it."""
|
|
|
|
server = ctx.message.server
|
|
|
|
current_status = server.me.status if server is not None else None
|
|
|
|
if stream_title:
|
|
stream_title = stream_title.strip()
|
|
if "twitch.tv/" not in streamer:
|
|
streamer = "https://www.twitch.tv/" + streamer
|
|
game = discord.Game(type=1, url=streamer, name=stream_title)
|
|
await self.bot.change_presence(game=game, status=current_status)
|
|
log.debug('Owner has set streaming status and url to "{}" and {}'.format(stream_title, streamer))
|
|
elif streamer is not None:
|
|
await self.bot.send_cmd_help(ctx)
|
|
return
|
|
else:
|
|
await self.bot.change_presence(game=None, status=current_status)
|
|
log.debug('stream cleared by owner')
|
|
await self.bot.say("Done.")
|
|
|
|
@_set.command()
|
|
@checks.is_owner()
|
|
async def avatar(self, url):
|
|
"""Sets Red's avatar"""
|
|
try:
|
|
async with self.session.get(url) as r:
|
|
data = await r.read()
|
|
await self.bot.edit_profile(self.bot.settings.password, avatar=data)
|
|
await self.bot.say("Done.")
|
|
log.debug("changed avatar")
|
|
except Exception as e:
|
|
await self.bot.say("Error, check your console or logs for "
|
|
"more information.")
|
|
log.exception(e)
|
|
traceback.print_exc()
|
|
|
|
@_set.command(name="token")
|
|
@checks.is_owner()
|
|
async def _token(self, token):
|
|
"""Sets Red's login token"""
|
|
if len(token) < 50:
|
|
await self.bot.say("Invalid token.")
|
|
else:
|
|
self.bot.settings.token = token
|
|
self.bot.settings.save_settings()
|
|
await self.bot.say("Token set. Restart me.")
|
|
log.debug("Token changed.")
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def shutdown(self, silently : bool=False):
|
|
"""Shuts down Red"""
|
|
wave = "\N{WAVING HAND SIGN}"
|
|
skin = "\N{EMOJI MODIFIER FITZPATRICK TYPE-3}"
|
|
try: # We don't want missing perms to stop our shutdown
|
|
if not silently:
|
|
await self.bot.say("Shutting down... " + wave + skin)
|
|
except:
|
|
pass
|
|
await self.bot.shutdown()
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def restart(self, silently : bool=False):
|
|
"""Attempts to restart Red
|
|
|
|
Makes Red quit with exit code 26
|
|
The restart is not guaranteed: it must be dealt
|
|
with by the process manager in use"""
|
|
try:
|
|
if not silently:
|
|
await self.bot.say("Restarting...")
|
|
except:
|
|
pass
|
|
await self.bot.shutdown(restart=True)
|
|
|
|
@commands.group(name="command", pass_context=True)
|
|
@checks.is_owner()
|
|
async def command_disabler(self, ctx):
|
|
"""Disables/enables commands
|
|
|
|
With no subcommands returns the disabled commands list"""
|
|
if ctx.invoked_subcommand is None:
|
|
await self.bot.send_cmd_help(ctx)
|
|
if self.disabled_commands:
|
|
msg = "Disabled commands:\n```xl\n"
|
|
for cmd in self.disabled_commands:
|
|
msg += "{}, ".format(cmd)
|
|
msg = msg.strip(", ")
|
|
await self.bot.whisper("{}```".format(msg))
|
|
|
|
@command_disabler.command()
|
|
async def disable(self, *, command):
|
|
"""Disables commands/subcommands"""
|
|
comm_obj = await self.get_command(command)
|
|
if comm_obj is KeyError:
|
|
await self.bot.say("That command doesn't seem to exist.")
|
|
elif comm_obj is False:
|
|
await self.bot.say("You cannot disable owner restricted commands.")
|
|
else:
|
|
comm_obj.enabled = False
|
|
comm_obj.hidden = True
|
|
self.disabled_commands.append(command)
|
|
dataIO.save_json(self.file_path, self.disabled_commands)
|
|
await self.bot.say("Command has been disabled.")
|
|
|
|
@command_disabler.command()
|
|
async def enable(self, *, command):
|
|
"""Enables commands/subcommands"""
|
|
if command in self.disabled_commands:
|
|
self.disabled_commands.remove(command)
|
|
dataIO.save_json(self.file_path, self.disabled_commands)
|
|
await self.bot.say("Command enabled.")
|
|
else:
|
|
await self.bot.say("That command is not disabled.")
|
|
return
|
|
try:
|
|
comm_obj = await self.get_command(command)
|
|
comm_obj.enabled = True
|
|
comm_obj.hidden = False
|
|
except: # In case it was in the disabled list but not currently loaded
|
|
pass # No point in even checking what returns
|
|
|
|
async def get_command(self, command):
|
|
command = command.split()
|
|
try:
|
|
comm_obj = self.bot.commands[command[0]]
|
|
if len(command) > 1:
|
|
command.pop(0)
|
|
for cmd in command:
|
|
comm_obj = comm_obj.commands[cmd]
|
|
except KeyError:
|
|
return KeyError
|
|
for check in comm_obj.checks:
|
|
if hasattr(check, "__name__") and check.__name__ == "is_owner_check":
|
|
return False
|
|
return comm_obj
|
|
|
|
async def disable_commands(self): # runs at boot
|
|
for cmd in self.disabled_commands:
|
|
cmd_obj = await self.get_command(cmd)
|
|
try:
|
|
cmd_obj.enabled = False
|
|
cmd_obj.hidden = True
|
|
except:
|
|
pass
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def join(self, invite_url: discord.Invite=None):
|
|
"""Joins new server"""
|
|
if hasattr(self.bot.user, 'bot') and self.bot.user.bot is True:
|
|
# Check to ensure they're using updated discord.py
|
|
msg = ("I have a **BOT** tag, so I must be invited with an OAuth2"
|
|
" link:\nFor more information: "
|
|
"https://twentysix26.github.io/"
|
|
"Red-Docs/red_guide_bot_accounts/#bot-invites")
|
|
await self.bot.say(msg)
|
|
if hasattr(self.bot, 'oauth_url'):
|
|
await self.bot.whisper("Here's my OAUTH2 link:\n{}".format(
|
|
self.bot.oauth_url))
|
|
return
|
|
|
|
if invite_url is None:
|
|
await self.bot.say("I need a Discord Invite link for the "
|
|
"server you want me to join.")
|
|
return
|
|
|
|
try:
|
|
await self.bot.accept_invite(invite_url)
|
|
await self.bot.say("Server joined.")
|
|
log.debug("We just joined {}".format(invite_url))
|
|
except discord.NotFound:
|
|
await self.bot.say("The invite was invalid or expired.")
|
|
except discord.HTTPException:
|
|
await self.bot.say("I wasn't able to accept the invite."
|
|
" Try again.")
|
|
|
|
@commands.command(pass_context=True, no_pm=True)
|
|
@checks.is_owner()
|
|
async def leave(self, ctx):
|
|
"""Leaves server"""
|
|
message = ctx.message
|
|
|
|
await self.bot.say("Are you sure you want me to leave this server?"
|
|
" Type yes to confirm.")
|
|
response = await self.bot.wait_for_message(author=message.author)
|
|
|
|
if response.content.lower().strip() == "yes":
|
|
await self.bot.say("Alright. Bye :wave:")
|
|
log.debug('Leaving "{}"'.format(message.server.name))
|
|
await self.bot.leave_server(message.server)
|
|
else:
|
|
await self.bot.say("Ok I'll stay here then.")
|
|
|
|
@commands.command(pass_context=True)
|
|
@checks.is_owner()
|
|
async def servers(self, ctx):
|
|
"""Lists and allows to leave servers"""
|
|
owner = ctx.message.author
|
|
servers = sorted(list(self.bot.servers),
|
|
key=lambda s: s.name.lower())
|
|
msg = ""
|
|
for i, server in enumerate(servers):
|
|
msg += "{}: {}\n".format(i, server.name)
|
|
msg += "\nTo leave a server just type its number."
|
|
|
|
for page in pagify(msg, ['\n']):
|
|
await self.bot.say(page)
|
|
|
|
while msg is not None:
|
|
msg = await self.bot.wait_for_message(author=owner, timeout=15)
|
|
try:
|
|
msg = int(msg.content)
|
|
await self.leave_confirmation(servers[msg], owner, ctx)
|
|
break
|
|
except (IndexError, ValueError, AttributeError):
|
|
pass
|
|
|
|
async def leave_confirmation(self, server, owner, ctx):
|
|
await self.bot.say("Are you sure you want me "
|
|
"to leave {}? (yes/no)".format(server.name))
|
|
|
|
msg = await self.bot.wait_for_message(author=owner, timeout=15)
|
|
|
|
if msg is None:
|
|
await self.bot.say("I guess not.")
|
|
elif msg.content.lower().strip() in ("yes", "y"):
|
|
await self.bot.leave_server(server)
|
|
if server != ctx.message.server:
|
|
await self.bot.say("Done.")
|
|
else:
|
|
await self.bot.say("Alright then.")
|
|
|
|
@commands.command(pass_context=True)
|
|
async def contact(self, ctx, *, message : str):
|
|
"""Sends message to the owner"""
|
|
if self.bot.settings.owner is None:
|
|
await self.bot.say("I have no owner set.")
|
|
return
|
|
owner = discord.utils.get(self.bot.get_all_members(),
|
|
id=self.bot.settings.owner)
|
|
author = ctx.message.author
|
|
if ctx.message.channel.is_private is False:
|
|
server = ctx.message.server
|
|
source = ", server **{}** ({})".format(server.name, server.id)
|
|
else:
|
|
source = ", direct message"
|
|
sender = "From **{}** ({}){}:\n\n".format(author, author.id, source)
|
|
message = sender + message
|
|
try:
|
|
await self.bot.send_message(owner, message)
|
|
except discord.errors.InvalidArgument:
|
|
await self.bot.say("I cannot send your message, I'm unable to find"
|
|
" my owner... *sigh*")
|
|
except discord.errors.HTTPException:
|
|
await self.bot.say("Your message is too long.")
|
|
except:
|
|
await self.bot.say("I'm unable to deliver your message. Sorry.")
|
|
else:
|
|
await self.bot.say("Your message has been sent.")
|
|
|
|
@commands.command()
|
|
async def info(self):
|
|
"""Shows info about Red"""
|
|
author_repo = "https://github.com/Twentysix26"
|
|
red_repo = author_repo + "/Red-DiscordBot"
|
|
server_url = "https://discord.gg/red"
|
|
dpy_repo = "https://github.com/Rapptz/discord.py"
|
|
python_url = "https://www.python.org/"
|
|
since = datetime.datetime(2016, 1, 2, 0, 0)
|
|
days_since = (datetime.datetime.utcnow() - since).days
|
|
dpy_version = "[{}]({})".format(discord.__version__, dpy_repo)
|
|
py_version = "[{}.{}.{}]({})".format(*os.sys.version_info[:3],
|
|
python_url)
|
|
|
|
owner_set = self.bot.settings.owner is not None
|
|
owner = self.bot.settings.owner if owner_set else None
|
|
if owner:
|
|
owner = discord.utils.get(self.bot.get_all_members(), id=owner)
|
|
if not owner:
|
|
try:
|
|
owner = await self.bot.get_user_info(self.bot.settings.owner)
|
|
except:
|
|
owner = None
|
|
if not owner:
|
|
owner = "Unknown"
|
|
|
|
about = (
|
|
"This is an instance of [Red, an open source Discord bot]({}) "
|
|
"created by [Twentysix]({}) and improved by many.\n\n"
|
|
"Red is backed by a passionate community who contributes and "
|
|
"creates content for everyone to enjoy. [Join us today]({}) "
|
|
"and help us improve!\n\n"
|
|
"".format(red_repo, author_repo, server_url))
|
|
|
|
embed = discord.Embed(colour=discord.Colour.red())
|
|
embed.add_field(name="Instance owned by", value=str(owner))
|
|
embed.add_field(name="Python", value=py_version)
|
|
embed.add_field(name="discord.py", value=dpy_version)
|
|
embed.add_field(name="About Red", value=about, inline=False)
|
|
embed.set_footer(text="Bringing joy since 02 Jan 2016 (over "
|
|
"{} days ago!)".format(days_since))
|
|
|
|
try:
|
|
await self.bot.say(embed=embed)
|
|
except discord.HTTPException:
|
|
await self.bot.say("I need the `Embed links` permission "
|
|
"to send this")
|
|
|
|
@commands.command()
|
|
async def uptime(self):
|
|
"""Shows Red's uptime"""
|
|
since = self.bot.uptime.strftime("%Y-%m-%d %H:%M:%S")
|
|
passed = self.get_bot_uptime()
|
|
await self.bot.say("Been up for: **{}** (since {} UTC)"
|
|
"".format(passed, since))
|
|
|
|
@commands.command()
|
|
async def version(self):
|
|
"""Shows Red's current version"""
|
|
response = self.bot.loop.run_in_executor(None, self._get_version)
|
|
result = await asyncio.wait_for(response, timeout=10)
|
|
try:
|
|
await self.bot.say(embed=result)
|
|
except discord.HTTPException:
|
|
await self.bot.say("I need the `Embed links` permission "
|
|
"to send this")
|
|
|
|
def _load_cog(self, cogname):
|
|
if not self._does_cogfile_exist(cogname):
|
|
raise CogNotFoundError(cogname)
|
|
try:
|
|
mod_obj = importlib.import_module(cogname)
|
|
importlib.reload(mod_obj)
|
|
self.bot.load_extension(mod_obj.__name__)
|
|
except SyntaxError as e:
|
|
raise CogLoadError(*e.args)
|
|
except:
|
|
raise
|
|
|
|
def _unload_cog(self, cogname, reloading=False):
|
|
if not reloading and cogname == "cogs.owner":
|
|
raise OwnerUnloadWithoutReloadError(
|
|
"Can't unload the owner plugin :P")
|
|
try:
|
|
self.bot.unload_extension(cogname)
|
|
except:
|
|
raise CogUnloadError
|
|
|
|
def _list_cogs(self):
|
|
cogs = [os.path.basename(f) for f in glob.glob("cogs/*.py")]
|
|
return ["cogs." + os.path.splitext(f)[0] for f in cogs]
|
|
|
|
def _does_cogfile_exist(self, module):
|
|
if "cogs." not in module:
|
|
module = "cogs." + module
|
|
if module not in self._list_cogs():
|
|
return False
|
|
return True
|
|
|
|
def _wait_for_answer(self, author):
|
|
print(author.name + " requested to be set as owner. If this is you, "
|
|
"type 'yes'. Otherwise press enter.")
|
|
print()
|
|
print("*DO NOT* set anyone else as owner. This has security "
|
|
"repercussions.")
|
|
|
|
choice = "None"
|
|
while choice.lower() != "yes" and choice == "None":
|
|
choice = input("> ")
|
|
|
|
if choice == "yes":
|
|
self.bot.settings.owner = author.id
|
|
self.bot.settings.save_settings()
|
|
print(author.name + " has been set as owner.")
|
|
self.setowner_lock = False
|
|
self.owner.hidden = True
|
|
else:
|
|
print("The set owner request has been ignored.")
|
|
self.setowner_lock = False
|
|
|
|
def _get_version(self):
|
|
if not os.path.isdir(".git"):
|
|
msg = "This instance of Red hasn't been installed with git."
|
|
e = discord.Embed(title=msg,
|
|
colour=discord.Colour.red())
|
|
return e
|
|
|
|
commands = " && ".join((
|
|
r'git config --get remote.origin.url', # Remote URL
|
|
r'git rev-list --count HEAD', # Number of commits
|
|
r'git rev-parse --abbrev-ref HEAD', # Branch name
|
|
r'git show -s -n 3 HEAD --format="%cr|%s|%H"' # Last 3 commits
|
|
))
|
|
result = os.popen(commands).read()
|
|
url, ncommits, branch, commits = result.split("\n", 3)
|
|
if url.endswith(".git"):
|
|
url = url[:-4]
|
|
repo_name = url.split("/")[-1]
|
|
|
|
embed = discord.Embed(title="Updates of " + repo_name,
|
|
description="Last three updates",
|
|
colour=discord.Colour.red(),
|
|
url="{}/tree/{}".format(url, branch))
|
|
|
|
for line in commits.split('\n'):
|
|
if not line:
|
|
continue
|
|
when, commit, chash = line.split("|")
|
|
commit_url = url + "/commit/" + chash
|
|
content = "[{}]({}) - {} ".format(chash[:6], commit_url, commit)
|
|
embed.add_field(name=when, value=content, inline=False)
|
|
|
|
embed.set_footer(text="Total commits: " + ncommits)
|
|
|
|
return embed
|
|
|
|
def get_bot_uptime(self, *, brief=False):
|
|
# Courtesy of Danny
|
|
now = datetime.datetime.utcnow()
|
|
delta = now - self.bot.uptime
|
|
hours, remainder = divmod(int(delta.total_seconds()), 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
days, hours = divmod(hours, 24)
|
|
|
|
if not brief:
|
|
if days:
|
|
fmt = '{d} days, {h} hours, {m} minutes, and {s} seconds'
|
|
else:
|
|
fmt = '{h} hours, {m} minutes, and {s} seconds'
|
|
else:
|
|
fmt = '{h}h {m}m {s}s'
|
|
if days:
|
|
fmt = '{d}d ' + fmt
|
|
|
|
return fmt.format(d=days, h=hours, m=minutes, s=seconds)
|
|
|
|
|
|
def check_files():
|
|
if not os.path.isfile("data/red/disabled_commands.json"):
|
|
print("Creating empty disabled_commands.json...")
|
|
dataIO.save_json("data/red/disabled_commands.json", [])
|
|
|
|
|
|
def setup(bot):
|
|
check_files()
|
|
n = Owner(bot)
|
|
bot.add_cog(n)
|