[V3] Update code standards (black code format pass) (#1650)

* ran black: code formatter against `redbot/` with `-l 99`

* badge
This commit is contained in:
Michael H
2018-05-14 15:33:24 -04:00
committed by Will
parent e7476edd68
commit b88b5a2601
90 changed files with 3629 additions and 3223 deletions

View File

@@ -4,16 +4,15 @@ __all__ = ["Config", "__version__"]
class VersionInfo:
def __init__(self, major, minor, micro, releaselevel, serial):
self._levels = ['alpha', 'beta', 'final']
self._levels = ["alpha", "beta", "final"]
self.major = major
self.minor = minor
self.micro = micro
if releaselevel not in self._levels:
raise TypeError("'releaselevel' must be one of: {}".format(
', '.join(self._levels)
))
raise TypeError("'releaselevel' must be one of: {}".format(", ".join(self._levels)))
self.releaselevel = releaselevel
self.serial = serial
@@ -21,8 +20,9 @@ class VersionInfo:
def __lt__(self, other):
my_index = self._levels.index(self.releaselevel)
other_index = self._levels.index(other.releaselevel)
return (self.major, self.minor, self.micro, my_index, self.serial) < \
(other.major, other.minor, other.micro, other_index, other.serial)
return (self.major, self.minor, self.micro, my_index, self.serial) < (
other.major, other.minor, other.micro, other_index, other.serial
)
def __repr__(self):
return "VersionInfo(major={}, minor={}, micro={}, releaselevel={}, serial={})".format(
@@ -32,5 +32,6 @@ class VersionInfo:
def to_json(self):
return [self.major, self.minor, self.micro, self.releaselevel, self.serial]
__version__ = "3.0.0b14"
version_info = VersionInfo(3, 0, 0, 'beta', 14)
version_info = VersionInfo(3, 0, 0, "beta", 14)

View File

@@ -6,29 +6,36 @@ import discord
from redbot.core import Config
__all__ = ["Account", "get_balance", "set_balance", "withdraw_credits", "deposit_credits",
"can_spend", "transfer_credits", "wipe_bank", "get_account", "is_global",
"set_global", "get_bank_name", "set_bank_name", "get_currency_name",
"set_currency_name", "get_default_balance", "set_default_balance"]
__all__ = [
"Account",
"get_balance",
"set_balance",
"withdraw_credits",
"deposit_credits",
"can_spend",
"transfer_credits",
"wipe_bank",
"get_account",
"is_global",
"set_global",
"get_bank_name",
"set_bank_name",
"get_currency_name",
"set_currency_name",
"get_default_balance",
"set_default_balance",
]
_DEFAULT_GLOBAL = {
"is_global": False,
"bank_name": "Twentysix bank",
"currency": "credits",
"default_balance": 100
"default_balance": 100,
}
_DEFAULT_GUILD = {
"bank_name": "Twentysix bank",
"currency": "credits",
"default_balance": 100
}
_DEFAULT_GUILD = {"bank_name": "Twentysix bank", "currency": "credits", "default_balance": 100}
_DEFAULT_MEMBER = {
"name": "",
"balance": 0,
"created_at": 0
}
_DEFAULT_MEMBER = {"name": "", "balance": 0, "created_at": 0}
_DEFAULT_USER = _DEFAULT_MEMBER
@@ -50,9 +57,9 @@ def _register_defaults():
_conf.register_member(**_DEFAULT_MEMBER)
_conf.register_user(**_DEFAULT_USER)
if not os.environ.get('BUILDING_DOCS'):
_conf = Config.get_conf(
None, 384734293238749, cog_name="Bank", force_registration=True)
if not os.environ.get("BUILDING_DOCS"):
_conf = Config.get_conf(None, 384734293238749, cog_name="Bank", force_registration=True)
_register_defaults()
@@ -285,7 +292,7 @@ async def wipe_bank():
await _conf.clear_all_members()
async def get_leaderboard(positions: int=None, guild: discord.Guild=None) -> List[tuple]:
async def get_leaderboard(positions: int = None, guild: discord.Guild = None) -> List[tuple]:
"""
Gets the bank's leaderboard
@@ -319,14 +326,16 @@ async def get_leaderboard(positions: int=None, guild: discord.Guild=None) -> Lis
if guild is None:
raise TypeError("Expected a guild, got NoneType object instead!")
raw_accounts = await _conf.all_members(guild)
sorted_acc = sorted(raw_accounts.items(), key=lambda x: x[1]['balance'], reverse=True)
sorted_acc = sorted(raw_accounts.items(), key=lambda x: x[1]["balance"], reverse=True)
if positions is None:
return sorted_acc
else:
return sorted_acc[:positions]
async def get_leaderboard_position(member: Union[discord.User, discord.Member]) -> Union[int, None]:
async def get_leaderboard_position(
member: Union[discord.User, discord.Member]
) -> Union[int, None]:
"""
Get the leaderboard position for the specified user
@@ -387,13 +396,13 @@ async def get_account(member: Union[discord.Member, discord.User]) -> Account:
if acc_data == {}:
acc_data = default
acc_data['name'] = member.display_name
acc_data["name"] = member.display_name
try:
acc_data['balance'] = await get_default_balance(member.guild)
acc_data["balance"] = await get_default_balance(member.guild)
except AttributeError:
acc_data['balance'] = await get_default_balance()
acc_data["balance"] = await get_default_balance()
acc_data['created_at'] = _decode_time(acc_data['created_at'])
acc_data["created_at"] = _decode_time(acc_data["created_at"])
return Account(**acc_data)
@@ -444,7 +453,7 @@ async def set_global(global_: bool) -> bool:
return global_
async def get_bank_name(guild: discord.Guild=None) -> str:
async def get_bank_name(guild: discord.Guild = None) -> str:
"""Get the current bank name.
Parameters
@@ -472,7 +481,7 @@ async def get_bank_name(guild: discord.Guild=None) -> str:
raise RuntimeError("Guild parameter is required and missing.")
async def set_bank_name(name: str, guild: discord.Guild=None) -> str:
async def set_bank_name(name: str, guild: discord.Guild = None) -> str:
"""Set the bank name.
Parameters
@@ -499,12 +508,13 @@ async def set_bank_name(name: str, guild: discord.Guild=None) -> str:
elif guild is not None:
await _conf.guild(guild).bank_name.set(name)
else:
raise RuntimeError("Guild must be provided if setting the name of a guild"
"-specific bank.")
raise RuntimeError(
"Guild must be provided if setting the name of a guild" "-specific bank."
)
return name
async def get_currency_name(guild: discord.Guild=None) -> str:
async def get_currency_name(guild: discord.Guild = None) -> str:
"""Get the currency name of the bank.
Parameters
@@ -532,7 +542,7 @@ async def get_currency_name(guild: discord.Guild=None) -> str:
raise RuntimeError("Guild must be provided.")
async def set_currency_name(name: str, guild: discord.Guild=None) -> str:
async def set_currency_name(name: str, guild: discord.Guild = None) -> str:
"""Set the currency name for the bank.
Parameters
@@ -559,12 +569,13 @@ async def set_currency_name(name: str, guild: discord.Guild=None) -> str:
elif guild is not None:
await _conf.guild(guild).currency.set(name)
else:
raise RuntimeError("Guild must be provided if setting the currency"
" name of a guild-specific bank.")
raise RuntimeError(
"Guild must be provided if setting the currency" " name of a guild-specific bank."
)
return name
async def get_default_balance(guild: discord.Guild=None) -> int:
async def get_default_balance(guild: discord.Guild = None) -> int:
"""Get the current default balance amount.
Parameters
@@ -592,7 +603,7 @@ async def get_default_balance(guild: discord.Guild=None) -> int:
raise RuntimeError("Guild is missing and required!")
async def set_default_balance(amount: int, guild: discord.Guild=None) -> int:
async def set_default_balance(amount: int, guild: discord.Guild = None) -> int:
"""Set the default balance amount.
Parameters

View File

@@ -14,15 +14,11 @@ from discord.ext.commands import when_mentioned_or
# This supresses the PyNaCl warning that isn't relevant here
from discord.voice_client import VoiceClient
VoiceClient.warn_nacl = False
from .cog_manager import CogManager
from . import (
Config,
i18n,
commands,
rpc
)
from . import Config, i18n, commands, rpc
from .help_formatter import Help, help as help_
from .sentry import SentryManager
from .utils import TYPE_CHECKING
@@ -32,6 +28,7 @@ if TYPE_CHECKING:
# noinspection PyUnresolvedReferences
class RpcMethodMixin:
async def rpc__cogs(self, request):
return list(self.cogs.keys())
@@ -48,7 +45,8 @@ class RedBase(BotBase, RpcMethodMixin):
Selfbots should inherit from this mixin along with `discord.Client`.
"""
def __init__(self, cli_flags, bot_dir: Path=Path.cwd(), **kwargs):
def __init__(self, cli_flags, bot_dir: Path = Path.cwd(), **kwargs):
self._shutdown_mode = ExitCodes.CRITICAL
self.db = Config.get_core_conf(force_registration=True)
self._co_owners = cli_flags.co_owner
@@ -62,22 +60,15 @@ class RedBase(BotBase, RpcMethodMixin):
whitelist=[],
blacklist=[],
enable_sentry=None,
locale='en',
embeds=True
locale="en",
embeds=True,
)
self.db.register_guild(
prefix=[],
whitelist=[],
blacklist=[],
admin_role=None,
mod_role=None,
embeds=None
prefix=[], whitelist=[], blacklist=[], admin_role=None, mod_role=None, embeds=None
)
self.db.register_user(
embeds=None
)
self.db.register_user(embeds=None)
async def prefix_manager(bot, message):
if not cli_flags.prefix:
@@ -88,9 +79,13 @@ class RedBase(BotBase, RpcMethodMixin):
return global_prefix
server_prefix = await bot.db.guild(message.guild).prefix()
if cli_flags.mentionable:
return when_mentioned_or(*server_prefix)(bot, message) \
if server_prefix else \
when_mentioned_or(*global_prefix)(bot, message)
return when_mentioned_or(*server_prefix)(
bot, message
) if server_prefix else when_mentioned_or(
*global_prefix
)(
bot, message
)
else:
return server_prefix if server_prefix else global_prefix
@@ -109,13 +104,13 @@ class RedBase(BotBase, RpcMethodMixin):
self.main_dir = bot_dir
self.cog_mgr = CogManager(paths=(str(self.main_dir / 'cogs'),))
self.cog_mgr = CogManager(paths=(str(self.main_dir / "cogs"),))
self.register_rpc_methods()
super().__init__(formatter=Help(), **kwargs)
self.remove_command('help')
self.remove_command("help")
self.add_command(help_)
@@ -124,7 +119,7 @@ class RedBase(BotBase, RpcMethodMixin):
def enable_sentry(self):
"""Enable Sentry logging for Red."""
if self._sentry_mgr is None:
sentry_log = logging.getLogger('red.sentry')
sentry_log = logging.getLogger("red.sentry")
sentry_log.setLevel(logging.WARNING)
self._sentry_mgr = SentryManager(sentry_log)
self._sentry_mgr.enable()
@@ -143,7 +138,7 @@ class RedBase(BotBase, RpcMethodMixin):
:return:
"""
indict['owner_id'] = await self.db.owner()
indict["owner_id"] = await self.db.owner()
i18n.set_locale(await self.db.locale())
async def embed_requested(self, channel, user, command=None) -> bool:
@@ -164,8 +159,9 @@ class RedBase(BotBase, RpcMethodMixin):
bool
:code:`True` if an embed is requested
"""
if isinstance(channel, discord.abc.PrivateChannel) or (
command and command == self.get_command("help")
if (
isinstance(channel, discord.abc.PrivateChannel)
or (command and command == self.get_command("help"))
):
user_setting = await self.db.user(user).embeds()
if user_setting is not None:
@@ -214,14 +210,14 @@ class RedBase(BotBase, RpcMethodMixin):
curr_pkgs.remove(pkg_name)
async def load_extension(self, spec: ModuleSpec):
name = spec.name.split('.')[-1]
name = spec.name.split(".")[-1]
if name in self.extensions:
return
lib = spec.loader.load_module()
if not hasattr(lib, 'setup'):
if not hasattr(lib, "setup"):
del lib
raise discord.ClientException('extension does not have a setup function')
raise discord.ClientException("extension does not have a setup function")
if asyncio.iscoroutinefunction(lib.setup):
await lib.setup(self)
@@ -262,7 +258,7 @@ class RedBase(BotBase, RpcMethodMixin):
del event_list[index]
try:
func = getattr(lib, 'teardown')
func = getattr(lib, "teardown")
except AttributeError:
pass
else:
@@ -279,19 +275,20 @@ class RedBase(BotBase, RpcMethodMixin):
if m.startswith(pkg_name):
del sys.modules[m]
if pkg_name.startswith('redbot.cogs'):
del sys.modules['redbot.cogs'].__dict__[name]
if pkg_name.startswith("redbot.cogs"):
del sys.modules["redbot.cogs"].__dict__[name]
def register_rpc_methods(self):
rpc.add_method('bot', self.rpc__cogs)
rpc.add_method('bot', self.rpc__extensions)
rpc.add_method("bot", self.rpc__cogs)
rpc.add_method("bot", self.rpc__extensions)
class Red(RedBase, discord.AutoShardedClient):
"""
You're welcome Caleb.
"""
async def shutdown(self, *, restart: bool=False):
async def shutdown(self, *, restart: bool = False):
"""Gracefully quit Red.
The program will exit with code :code:`0` by default.
@@ -314,4 +311,4 @@ class Red(RedBase, discord.AutoShardedClient):
class ExitCodes(Enum):
CRITICAL = 1
SHUTDOWN = 0
RESTART = 26
RESTART = 26

View File

@@ -5,23 +5,22 @@ from discord.ext import commands
async def check_overrides(ctx, *, level):
if await ctx.bot.is_owner(ctx.author):
return True
perm_cog = ctx.bot.get_cog('Permissions')
perm_cog = ctx.bot.get_cog("Permissions")
if not perm_cog or ctx.cog == perm_cog:
return None
# don't break if someone loaded a cog named
# permissions that doesn't implement this
func = getattr(perm_cog, 'check_overrides', None)
func = getattr(perm_cog, "check_overrides", None)
val = None if func is None else await func(ctx, level)
return val
def is_owner(**kwargs):
async def check(ctx):
override = await check_overrides(ctx, level='owner')
return (
override if override is not None
else await ctx.bot.is_owner(ctx.author, **kwargs)
)
override = await check_overrides(ctx, level="owner")
return (override if override is not None else await ctx.bot.is_owner(ctx.author, **kwargs))
return commands.check(check)
@@ -32,10 +31,7 @@ async def check_permissions(ctx, perms):
return False
resolved = ctx.channel.permissions_for(ctx.author)
return all(
getattr(resolved, name, None) == value
for name, value in perms.items()
)
return all(getattr(resolved, name, None) == value for name, value in perms.items())
async def is_mod_or_superior(ctx):
@@ -75,47 +71,49 @@ async def is_admin_or_superior(ctx):
def mod_or_permissions(**perms):
async def predicate(ctx):
override = await check_overrides(ctx, level='mod')
override = await check_overrides(ctx, level="mod")
return (
override if override is not None
else await check_permissions(ctx, perms)
or await is_mod_or_superior(ctx)
override
if override is not None
else await check_permissions(ctx, perms) or await is_mod_or_superior(ctx)
)
return commands.check(predicate)
def admin_or_permissions(**perms):
async def predicate(ctx):
override = await check_overrides(ctx, level='admin')
override = await check_overrides(ctx, level="admin")
return (
override if override is not None
else await check_permissions(ctx, perms)
or await is_admin_or_superior(ctx)
override
if override is not None
else await check_permissions(ctx, perms) or await is_admin_or_superior(ctx)
)
return commands.check(predicate)
def bot_in_a_guild(**kwargs):
async def predicate(ctx):
return len(ctx.bot.guilds) > 0
return commands.check(predicate)
def guildowner_or_permissions(**perms):
async def predicate(ctx):
has_perms_or_is_owner = await check_permissions(ctx, perms)
if ctx.guild is None:
return has_perms_or_is_owner
is_guild_owner = ctx.author == ctx.guild.owner
override = await check_overrides(ctx, level='guildowner')
return (
override if override is not None
else is_guild_owner or has_perms_or_is_owner
)
override = await check_overrides(ctx, level="guildowner")
return (override if override is not None else is_guild_owner or has_perms_or_is_owner)
return commands.check(predicate)

View File

@@ -26,16 +26,17 @@ def interactive_config(red, token_set, prefix_set):
if not prefix_set:
prefix = ""
print("\nPick a prefix. A prefix is what you type before a "
"command. Example:\n"
"!help\n^ The exclamation mark is the prefix in this case.\n"
"Can be multiple characters. You will be able to change it "
"later and add more of them.\nChoose your prefix:\n")
print(
"\nPick a prefix. A prefix is what you type before a "
"command. Example:\n"
"!help\n^ The exclamation mark is the prefix in this case.\n"
"Can be multiple characters. You will be able to change it "
"later and add more of them.\nChoose your prefix:\n"
)
while not prefix:
prefix = input("Prefix> ")
if len(prefix) > 10:
print("Your prefix seems overly long. Are you sure it "
"is correct? (y/n)")
print("Your prefix seems overly long. Are you sure it " "is correct? (y/n)")
if not confirm("> "):
prefix = ""
if prefix:
@@ -48,12 +49,14 @@ def interactive_config(red, token_set, prefix_set):
def ask_sentry(red: Red):
loop = asyncio.get_event_loop()
print("\nThank you for installing Red V3 beta! The current version\n"
" is not suited for production use and is aimed at testing\n"
" the current and upcoming featureset, that's why we will\n"
" also collect the fatal error logs to help us fix any new\n"
" found issues in a timely manner. If you wish to opt in\n"
" the process please type \"yes\":\n")
print(
"\nThank you for installing Red V3 beta! The current version\n"
" is not suited for production use and is aimed at testing\n"
" the current and upcoming featureset, that's why we will\n"
" also collect the fatal error logs to help us fix any new\n"
" found issues in a timely manner. If you wish to opt in\n"
' the process please type "yes":\n'
)
if not confirm("> "):
loop.run_until_complete(red.db.enable_sentry.set(False))
else:
@@ -62,64 +65,82 @@ def ask_sentry(red: Red):
def parse_cli_flags(args):
parser = argparse.ArgumentParser(description="Red - Discord Bot",
usage="redbot <instance_name> [arguments]")
parser.add_argument("--version", "-V", action="store_true",
help="Show Red's current version")
parser.add_argument("--list-instances", action="store_true",
help="List all instance names setup "
"with 'redbot-setup'")
parser.add_argument("--owner", type=int,
help="ID of the owner. Only who hosts "
"Red should be owner, this has "
"serious security implications if misused.")
parser.add_argument("--co-owner", type=int, default=[], nargs="*",
help="ID of a co-owner. Only people who have access "
"to the system that is hosting Red should be "
"co-owners, as this gives them complete access "
"to the system's data. This has serious "
"security implications if misused. Can be "
"multiple.")
parser.add_argument("--prefix", "-p", action="append",
help="Global prefix. Can be multiple")
parser.add_argument("--no-prompt", action="store_true",
help="Disables console inputs. Features requiring "
"console interaction could be disabled as a "
"result")
parser.add_argument("--no-cogs",
action="store_true",
help="Starts Red with no cogs loaded, only core")
parser.add_argument("--load-cogs", type=str, nargs="*",
help="Force loading specified cogs from the installed packages. "
"Can be used with the --no-cogs flag to load these cogs exclusively.")
parser.add_argument("--self-bot",
action='store_true',
help="Specifies if Red should log in as selfbot")
parser.add_argument("--not-bot",
action='store_true',
help="Specifies if the token used belongs to a bot "
"account.")
parser.add_argument("--dry-run",
action="store_true",
help="Makes Red quit with code 0 just before the "
"login. This is useful for testing the boot "
"process.")
parser.add_argument("--debug",
action="store_true",
help="Sets the loggers level as debug")
parser.add_argument("--dev",
action="store_true",
help="Enables developer mode")
parser.add_argument("--mentionable",
action="store_true",
help="Allows mentioning the bot as an alternative "
"to using the bot prefix")
parser.add_argument("--rpc",
action="store_true",
help="Enables the built-in RPC server. Please read the docs"
"prior to enabling this!")
parser.add_argument("instance_name", nargs="?",
help="Name of the bot instance created during `redbot-setup`.")
parser = argparse.ArgumentParser(
description="Red - Discord Bot", usage="redbot <instance_name> [arguments]"
)
parser.add_argument("--version", "-V", action="store_true", help="Show Red's current version")
parser.add_argument(
"--list-instances",
action="store_true",
help="List all instance names setup " "with 'redbot-setup'",
)
parser.add_argument(
"--owner",
type=int,
help="ID of the owner. Only who hosts "
"Red should be owner, this has "
"serious security implications if misused.",
)
parser.add_argument(
"--co-owner",
type=int,
default=[],
nargs="*",
help="ID of a co-owner. Only people who have access "
"to the system that is hosting Red should be "
"co-owners, as this gives them complete access "
"to the system's data. This has serious "
"security implications if misused. Can be "
"multiple.",
)
parser.add_argument("--prefix", "-p", action="append", help="Global prefix. Can be multiple")
parser.add_argument(
"--no-prompt",
action="store_true",
help="Disables console inputs. Features requiring "
"console interaction could be disabled as a "
"result",
)
parser.add_argument(
"--no-cogs", action="store_true", help="Starts Red with no cogs loaded, only core"
)
parser.add_argument(
"--load-cogs",
type=str,
nargs="*",
help="Force loading specified cogs from the installed packages. "
"Can be used with the --no-cogs flag to load these cogs exclusively.",
)
parser.add_argument(
"--self-bot", action="store_true", help="Specifies if Red should log in as selfbot"
)
parser.add_argument(
"--not-bot",
action="store_true",
help="Specifies if the token used belongs to a bot " "account.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Makes Red quit with code 0 just before the "
"login. This is useful for testing the boot "
"process.",
)
parser.add_argument("--debug", action="store_true", help="Sets the loggers level as debug")
parser.add_argument("--dev", action="store_true", help="Enables developer mode")
parser.add_argument(
"--mentionable",
action="store_true",
help="Allows mentioning the bot as an alternative " "to using the bot prefix",
)
parser.add_argument(
"--rpc",
action="store_true",
help="Enables the built-in RPC server. Please read the docs" "prior to enabling this!",
)
parser.add_argument(
"instance_name", nargs="?", help="Name of the bot instance created during `redbot-setup`."
)
args = parser.parse_args(args)
@@ -129,4 +150,3 @@ def parse_cli_flags(args):
args.prefix = []
return args

View File

@@ -34,14 +34,12 @@ class CogManager:
install new cogs to, the default being the :code:`cogs/` folder in the root
bot directory.
"""
def __init__(self, paths: Tuple[str]=()):
def __init__(self, paths: Tuple[str] = ()):
self.conf = Config.get_conf(self, 2938473984732, True)
tmp_cog_install_path = cog_data_path(self) / "cogs"
tmp_cog_install_path.mkdir(parents=True, exist_ok=True)
self.conf.register_global(
paths=(),
install_path=str(tmp_cog_install_path)
)
self.conf.register_global(paths=(), install_path=str(tmp_cog_install_path))
self._paths = [Path(p) for p in paths]
@@ -158,7 +156,7 @@ class CogManager:
if path == await self.install_path():
raise ValueError("Cannot add the install path as an additional path.")
all_paths = _deduplicate(await self.paths() + (path, ))
all_paths = _deduplicate(await self.paths() + (path,))
# noinspection PyTypeChecker
await self.set_paths(all_paths)
@@ -225,8 +223,10 @@ class CogManager:
if spec:
return spec
raise RuntimeError("No 3rd party module by the name of '{}' was found"
" in any available path.".format(name))
raise RuntimeError(
"No 3rd party module by the name of '{}' was found"
" in any available path.".format(name)
)
async def _find_core_cog(self, name: str) -> ModuleSpec:
"""
@@ -247,10 +247,11 @@ class CogManager:
"""
real_name = ".{}".format(name)
try:
mod = import_module(real_name, package='redbot.cogs')
mod = import_module(real_name, package="redbot.cogs")
except ImportError as e:
raise RuntimeError("No core cog by the name of '{}' could"
"be found.".format(name)) from e
raise RuntimeError(
"No core cog by the name of '{}' could" "be found.".format(name)
) from e
return mod.__spec__
# noinspection PyUnreachableCode
@@ -284,7 +285,7 @@ class CogManager:
async def available_modules(self) -> List[str]:
"""Finds the names of all available modules to load.
"""
paths = (await self.install_path(), ) + await self.paths()
paths = (await self.install_path(),) + await self.paths()
paths = [str(p) for p in paths]
ret = []
@@ -341,8 +342,9 @@ class CogManagerUI:
Add a path to the list of available cog paths.
"""
if not path.is_dir():
await ctx.send(_("That path does not exist or does not"
" point to a valid directory."))
await ctx.send(
_("That path does not exist or does not" " point to a valid directory.")
)
return
try:
@@ -398,7 +400,7 @@ class CogManagerUI:
@commands.command()
@checks.is_owner()
async def installpath(self, ctx: commands.Context, path: Path=None):
async def installpath(self, ctx: commands.Context, path: Path = None):
"""
Returns the current install path or sets it if one is provided.
The provided path must be absolute or relative to the bot's
@@ -416,8 +418,9 @@ class CogManagerUI:
return
install_path = await ctx.bot.cog_mgr.install_path()
await ctx.send(_("The bot will install new cogs to the `{}`"
" directory.").format(install_path))
await ctx.send(
_("The bot will install new cogs to the `{}`" " directory.").format(install_path)
)
@commands.command()
@checks.is_owner()
@@ -435,22 +438,20 @@ class CogManagerUI:
unloaded = sorted(list(unloaded), key=str.lower)
if await ctx.embed_requested():
loaded = ('**{} loaded:**\n').format(len(loaded)) + ", ".join(loaded)
unloaded = ('**{} unloaded:**\n').format(len(unloaded)) + ", ".join(unloaded)
loaded = ("**{} loaded:**\n").format(len(loaded)) + ", ".join(loaded)
unloaded = ("**{} unloaded:**\n").format(len(unloaded)) + ", ".join(unloaded)
for page in pagify(loaded, delims=[', ', '\n'], page_length=1800):
e = discord.Embed(description=page,
colour=discord.Colour.dark_green())
for page in pagify(loaded, delims=[", ", "\n"], page_length=1800):
e = discord.Embed(description=page, colour=discord.Colour.dark_green())
await ctx.send(embed=e)
for page in pagify(unloaded, delims=[', ', '\n'], page_length=1800):
e = discord.Embed(description=page,
colour=discord.Colour.dark_red())
for page in pagify(unloaded, delims=[", ", "\n"], page_length=1800):
e = discord.Embed(description=page, colour=discord.Colour.dark_red())
await ctx.send(embed=e)
else:
loaded_count = '**{} loaded:**\n'.format(len(loaded))
loaded_count = "**{} loaded:**\n".format(len(loaded))
loaded = ", ".join(loaded)
unloaded_count = '**{} unloaded:**\n'.format(len(unloaded))
unloaded_count = "**{} unloaded:**\n".format(len(unloaded))
unloaded = ", ".join(unloaded)
loaded_count_sent = False
unloaded_count_sent = False

View File

@@ -20,7 +20,7 @@ class Command(commands.Command):
"""
def __init__(self, *args, **kwargs):
self._help_override = kwargs.pop('help_override', None)
self._help_override = kwargs.pop("help_override", None)
super().__init__(*args, **kwargs)
self.translator = kwargs.pop("i18n", None)
@@ -40,7 +40,7 @@ class Command(commands.Command):
translator = self.translator
command_doc = self.callback.__doc__
if command_doc is None:
return ''
return ""
return inspect.cleandoc(translator(command_doc))
@help.setter
@@ -60,6 +60,7 @@ class Group(Command, commands.Group):
# decorators
def command(name=None, cls=Command, **attrs):
"""A decorator which transforms an async function into a `Command`.

View File

@@ -59,10 +59,9 @@ class Context(commands.Context):
else:
return True
async def send_interactive(self,
messages: Iterable[str],
box_lang: str=None,
timeout: int=15) -> List[discord.Message]:
async def send_interactive(
self, messages: Iterable[str], box_lang: str = None, timeout: int = 15
) -> List[discord.Message]:
"""Send multiple messages interactively.
The user will be prompted for whether or not they would like to view
@@ -84,9 +83,9 @@ class Context(commands.Context):
messages = tuple(messages)
ret = []
more_check = lambda m: (m.author == self.author and
m.channel == self.channel and
m.content.lower() == "more")
more_check = lambda m: (
m.author == self.author and m.channel == self.channel and m.content.lower() == "more"
)
for idx, page in enumerate(messages, 1):
if box_lang is None:
@@ -105,10 +104,10 @@ class Context(commands.Context):
query = await self.send(
"There {} still {} message{} remaining. "
"Type `more` to continue."
"".format(is_are, n_remaining, plural))
"".format(is_are, n_remaining, plural)
)
try:
resp = await self.bot.wait_for(
'message', check=more_check, timeout=timeout)
resp = await self.bot.wait_for("message", check=more_check, timeout=timeout)
except asyncio.TimeoutError:
await query.delete()
break
@@ -134,9 +133,7 @@ class Context(commands.Context):
"""
if self.guild and not self.channel.permissions_for(self.guild.me).embed_links:
return False
return await self.bot.embed_requested(
self.channel, self.author, command=self.command
)
return await self.bot.embed_requested(self.channel, self.author, command=self.command)
async def maybe_send_embed(self, message: str) -> discord.Message:
"""

View File

@@ -38,9 +38,11 @@ class _ValueCtxManager:
async def __aenter__(self):
self.raw_value = await self
if not isinstance(self.raw_value, (list, dict)):
raise TypeError("Type of retrieved value must be mutable (i.e. "
"list or dict) in order to use a config value as "
"a context manager.")
raise TypeError(
"Type of retrieved value must be mutable (i.e. "
"list or dict) in order to use a config value as "
"a context manager."
)
return self.raw_value
async def __aexit__(self, *exc_info):
@@ -61,6 +63,7 @@ class Value:
A reference to `Config.driver`.
"""
def __init__(self, identifiers: Tuple[str], default_value, driver):
self._identifiers = identifiers
self.default = default_value
@@ -168,10 +171,10 @@ class Group(Value):
A reference to `Config.driver`.
"""
def __init__(self, identifiers: Tuple[str],
defaults: dict,
driver,
force_registration: bool=False):
def __init__(
self, identifiers: Tuple[str], defaults: dict, driver, force_registration: bool = False
):
self._defaults = defaults
self.force_registration = force_registration
self.driver = driver
@@ -209,31 +212,22 @@ class Group(Value):
"""
is_group = self.is_group(item)
is_value = not is_group and self.is_value(item)
new_identifiers = self.identifiers + (item, )
new_identifiers = self.identifiers + (item,)
if is_group:
return Group(
identifiers=new_identifiers,
defaults=self._defaults[item],
driver=self.driver,
force_registration=self.force_registration
force_registration=self.force_registration,
)
elif is_value:
return Value(
identifiers=new_identifiers,
default_value=self._defaults[item],
driver=self.driver
identifiers=new_identifiers, default_value=self._defaults[item], driver=self.driver
)
elif self.force_registration:
raise AttributeError(
"'{}' is not a valid registered Group "
"or value.".format(item)
)
raise AttributeError("'{}' is not a valid registered Group " "or value.".format(item))
else:
return Value(
identifiers=new_identifiers,
default_value=None,
driver=self.driver
)
return Value(identifiers=new_identifiers, default_value=None, driver=self.driver)
def is_group(self, item: str) -> bool:
"""A helper method for `__getattr__`. Most developers will have no need
@@ -385,9 +379,7 @@ class Group(Value):
async def set(self, value):
if not isinstance(value, dict):
raise ValueError(
"You may only set the value of a group to be a dict."
)
raise ValueError("You may only set the value of a group to be a dict.")
await super().set(value)
async def set_raw(self, *nested_path: str, value):
@@ -456,10 +448,14 @@ class Config:
USER = "USER"
MEMBER = "MEMBER"
def __init__(self, cog_name: str, unique_identifier: str,
driver: "BaseDriver",
force_registration: bool=False,
defaults: dict=None):
def __init__(
self,
cog_name: str,
unique_identifier: str,
driver: "BaseDriver",
force_registration: bool = False,
defaults: dict = None,
):
self.cog_name = cog_name
self.unique_identifier = unique_identifier
@@ -472,8 +468,7 @@ class Config:
return deepcopy(self._defaults)
@classmethod
def get_conf(cls, cog_instance, identifier: int,
force_registration=False, cog_name=None):
def get_conf(cls, cog_instance, identifier: int, force_registration=False, cog_name=None):
"""Get a Config instance for your cog.
.. warning::
@@ -519,20 +514,24 @@ class Config:
log.debug("Basic config: \n\n{}".format(basic_config))
driver_name = basic_config.get('STORAGE_TYPE', 'JSON')
driver_details = basic_config.get('STORAGE_DETAILS', {})
driver_name = basic_config.get("STORAGE_TYPE", "JSON")
driver_details = basic_config.get("STORAGE_DETAILS", {})
log.debug("Using driver: '{}'".format(driver_name))
driver = get_driver(driver_name, cog_name, uuid, data_path_override=cog_path_override,
**driver_details)
conf = cls(cog_name=cog_name, unique_identifier=uuid,
force_registration=force_registration,
driver=driver)
driver = get_driver(
driver_name, cog_name, uuid, data_path_override=cog_path_override, **driver_details
)
conf = cls(
cog_name=cog_name,
unique_identifier=uuid,
force_registration=force_registration,
driver=driver,
)
return conf
@classmethod
def get_core_conf(cls, force_registration: bool=False):
def get_core_conf(cls, force_registration: bool = False):
"""Get a Config instance for a core module.
All core modules that require a config instance should use this
@@ -549,14 +548,18 @@ class Config:
# We have to import this here otherwise we have a circular dependency
from .data_manager import basic_config
driver_name = basic_config.get('STORAGE_TYPE', 'JSON')
driver_details = basic_config.get('STORAGE_DETAILS', {})
driver_name = basic_config.get("STORAGE_TYPE", "JSON")
driver_details = basic_config.get("STORAGE_DETAILS", {})
driver = get_driver(driver_name, "Core", '0', data_path_override=core_path,
**driver_details)
conf = cls(cog_name="Core", driver=driver,
unique_identifier='0',
force_registration=force_registration)
driver = get_driver(
driver_name, "Core", "0", data_path_override=core_path, **driver_details
)
conf = cls(
cog_name="Core",
driver=driver,
unique_identifier="0",
force_registration=force_registration,
)
return conf
def __getattr__(self, item: str) -> Union[Group, Value]:
@@ -593,7 +596,7 @@ class Config:
"""
ret = {}
partial = ret
splitted = key.split('__')
splitted = key.split("__")
for i, k in enumerate(splitted, start=1):
if not k.isidentifier():
raise RuntimeError("'{}' is an invalid config key.".format(k))
@@ -621,8 +624,9 @@ class Config:
existing_is_dict = isinstance(_partial[k], dict)
if val_is_dict != existing_is_dict:
# != is XOR
raise KeyError("You cannot register a Group and a Value under"
" the same name.")
raise KeyError(
"You cannot register a Group and a Value under" " the same name."
)
if val_is_dict:
Config._update_defaults(v, _partial=_partial[k])
else:
@@ -736,7 +740,7 @@ class Config:
identifiers=(key, *identifiers),
defaults=self.defaults.get(key, {}),
driver=self.driver,
force_registration=self.force_registration
force_registration=self.force_registration,
)
def guild(self, guild: discord.Guild) -> Group:
@@ -935,7 +939,7 @@ class Config:
ret[int(member_id)] = new_member_data
return ret
async def all_members(self, guild: discord.Guild=None) -> dict:
async def all_members(self, guild: discord.Guild = None) -> dict:
"""Get data for all members.
If :code:`guild` is specified, only the data for the members of that
@@ -965,8 +969,7 @@ class Config:
group = self._get_base_group(self.MEMBER)
dict_ = await group()
for guild_id, guild_data in dict_.items():
ret[int(guild_id)] = self._all_members_from_guild(
group, guild_data)
ret[int(guild_id)] = self._all_members_from_guild(group, guild_data)
else:
group = self._get_base_group(self.MEMBER, guild.id)
guild_data = await group()
@@ -992,9 +995,7 @@ class Config:
"""
if not scopes:
group = Group(identifiers=[],
defaults={},
driver=self.driver)
group = Group(identifiers=[], defaults={}, driver=self.driver)
else:
group = self._get_base_group(*scopes)
await group.clear()
@@ -1046,7 +1047,7 @@ class Config:
"""
await self._clear_scope(self.USER)
async def clear_all_members(self, guild: discord.Guild=None):
async def clear_all_members(self, guild: discord.Guild = None):
"""Clear all member data.
This resets all specified member data to its registered defaults.

View File

@@ -32,10 +32,12 @@ __all__ = ["Core"]
log = logging.getLogger("red")
OWNER_DISCLAIMER = ("⚠ **Only** the person who is hosting Red should be "
"owner. **This has SERIOUS security implications. The "
"owner can access any data that is present on the host "
"system.** ⚠")
OWNER_DISCLAIMER = (
"⚠ **Only** the person who is hosting Red should be "
"owner. **This has SERIOUS security implications. The "
"owner can access any data that is present on the host "
"system.** ⚠"
)
_ = i18n.Translator("Core", __file__)
@@ -44,12 +46,13 @@ _ = i18n.Translator("Core", __file__)
@i18n.cog_i18n(_)
class Core:
"""Commands related to core functions"""
def __init__(self, bot):
self.bot = bot # type: Red
rpc.add_method('core', self.rpc_load)
rpc.add_method('core', self.rpc_unload)
rpc.add_method('core', self.rpc_reload)
rpc.add_method("core", self.rpc_load)
rpc.add_method("core", self.rpc_unload)
rpc.add_method("core", self.rpc_reload)
@commands.command(hidden=True)
async def ping(self, ctx):
@@ -72,15 +75,13 @@ class Core:
since = datetime.datetime(2016, 1, 2, 0, 0)
days_since = (datetime.datetime.utcnow() - since).days
dpy_version = "[{}]({})".format(discord.__version__, dpy_repo)
python_version = "[{}.{}.{}]({})".format(
*sys.version_info[:3], python_url
)
python_version = "[{}.{}.{}]({})".format(*sys.version_info[:3], python_url)
red_version = "[{}]({})".format(__version__, red_pypi)
app_info = await self.bot.application_info()
owner = app_info.owner
async with aiohttp.ClientSession() as session:
async with session.get('{}/json'.format(red_pypi)) as r:
async with session.get("{}/json".format(red_pypi)) as r:
data = await r.json()
outdated = StrictVersion(data["info"]["version"]) > StrictVersion(__version__)
about = (
@@ -89,7 +90,8 @@ class Core:
"Red is backed by a passionate community who contributes and "
"creates content for everyone to enjoy. [Join us today]({}) "
"and help us improve!\n\n"
"".format(red_repo, author_repo, org_repo, support_server_url))
"".format(red_repo, author_repo, org_repo, support_server_url)
)
embed = discord.Embed(color=discord.Color.red())
embed.add_field(name="Instance owned by", value=str(owner))
@@ -97,14 +99,14 @@ class Core:
embed.add_field(name="discord.py", value=dpy_version)
embed.add_field(name="Red version", value=red_version)
if outdated:
embed.add_field(name="Outdated", value="Yes, {} is available".format(
data["info"]["version"]
)
embed.add_field(
name="Outdated", value="Yes, {} is available".format(data["info"]["version"])
)
embed.add_field(name="About Red", value=about, inline=False)
embed.set_footer(text="Bringing joy since 02 Jan 2016 (over "
"{} days ago!)".format(days_since))
embed.set_footer(
text="Bringing joy since 02 Jan 2016 (over " "{} days ago!)".format(days_since)
)
try:
await ctx.send(embed=embed)
except discord.HTTPException:
@@ -115,11 +117,7 @@ class Core:
"""Shows Red's uptime"""
since = ctx.bot.uptime.strftime("%Y-%m-%d %H:%M:%S")
passed = self.get_bot_uptime()
await ctx.send(
"Been up for: **{}** (since {} UTC)".format(
passed, since
)
)
await ctx.send("Been up for: **{}** (since {} UTC)".format(passed, since))
def get_bot_uptime(self, *, brief=False):
# Courtesy of Danny
@@ -131,13 +129,13 @@ class Core:
if not brief:
if days:
fmt = '{d} days, {h} hours, {m} minutes, and {s} seconds'
fmt = "{d} days, {h} hours, {m} minutes, and {s} seconds"
else:
fmt = '{h} hours, {m} minutes, and {s} seconds'
fmt = "{h} hours, {m} minutes, and {s} seconds"
else:
fmt = '{h}h {m}m {s}s'
fmt = "{h}h {m}m {s}s"
if days:
fmt = '{d}d ' + fmt
fmt = "{d}d " + fmt
return fmt.format(d=days, h=hours, m=minutes, s=seconds)
@@ -176,14 +174,12 @@ class Core:
current = await self.bot.db.embeds()
await self.bot.db.embeds.set(not current)
await ctx.send(
_("Embeds are now {} by default.").format(
"disabled" if current else "enabled"
)
_("Embeds are now {} by default.").format("disabled" if current else "enabled")
)
@embedset.command(name="guild")
@checks.guildowner_or_permissions(administrator=True)
async def embedset_guild(self, ctx: commands.Context, enabled: bool=None):
async def embedset_guild(self, ctx: commands.Context, enabled: bool = None):
"""
Toggle the guild's embed setting.
@@ -197,18 +193,14 @@ class Core:
"""
await self.bot.db.guild(ctx.guild).embeds.set(enabled)
if enabled is None:
await ctx.send(
_("Embeds will now fall back to the global setting.")
)
await ctx.send(_("Embeds will now fall back to the global setting."))
else:
await ctx.send(
_("Embeds are now {} for this guild.").format(
"enabled" if enabled else "disabled"
)
_("Embeds are now {} for this guild.").format("enabled" if enabled else "disabled")
)
@embedset.command(name="user")
async def embedset_user(self, ctx: commands.Context, enabled: bool=None):
async def embedset_user(self, ctx: commands.Context, enabled: bool = None):
"""
Toggle the user's embed setting.
@@ -222,19 +214,15 @@ class Core:
"""
await self.bot.db.user(ctx.author).embeds.set(enabled)
if enabled is None:
await ctx.send(
_("Embeds will now fall back to the global setting.")
)
await ctx.send(_("Embeds will now fall back to the global setting."))
else:
await ctx.send(
_("Embeds are now {} for you.").format(
"enabled" if enabled else "disabled"
)
_("Embeds are now {} for you.").format("enabled" if enabled else "disabled")
)
@commands.command()
@checks.is_owner()
async def traceback(self, ctx, public: bool=False):
async def traceback(self, ctx, public: bool = False):
"""Sends to the owner the last command exception that has occurred
If public (yes is specified), it will be sent to the chat instead"""
@@ -267,8 +255,7 @@ class Core:
author = ctx.author
guild = ctx.guild
await ctx.send("Are you sure you want me to leave this server?"
" Type yes to confirm.")
await ctx.send("Are you sure you want me to leave this server?" " Type yes to confirm.")
def conf_check(m):
return m.author == author
@@ -285,15 +272,14 @@ class Core:
async def servers(self, ctx):
"""Lists and allows to leave servers"""
owner = ctx.author
guilds = sorted(list(self.bot.guilds),
key=lambda s: s.name.lower())
guilds = sorted(list(self.bot.guilds), key=lambda s: s.name.lower())
msg = ""
for i, server in enumerate(guilds, 1):
msg += "{}: {}\n".format(i, server.name)
msg += "\nTo leave a server, just type its number."
for page in pagify(msg, ['\n']):
for page in pagify(msg, ["\n"]):
await ctx.send(page)
def msg_check(m):
@@ -343,7 +329,7 @@ class Core:
loaded_packages = []
notfound_packages = []
cognames = [c.strip() for c in cog_name.split(' ')]
cognames = [c.strip() for c in cog_name.split(" ")]
cogspecs = []
for c in cognames:
@@ -352,20 +338,22 @@ class Core:
cogspecs.append((spec, c))
except RuntimeError:
notfound_packages.append(inline(c))
#await ctx.send(_("No module named '{}' was found in any"
# await ctx.send(_("No module named '{}' was found in any"
# " cog path.").format(c))
if len(cogspecs) > 0:
for spec, name in cogspecs:
for spec, name in cogspecs:
try:
await ctx.bot.load_extension(spec)
except Exception as e:
log.exception("Package loading failed", exc_info=e)
exception_log = ("Exception in command '{}'\n"
"".format(ctx.command.qualified_name))
exception_log += "".join(traceback.format_exception(type(e),
e, e.__traceback__))
exception_log = (
"Exception in command '{}'\n" "".format(ctx.command.qualified_name)
)
exception_log += "".join(
traceback.format_exception(type(e), e, e.__traceback__)
)
self.bot._last_exception = exception_log
failed_packages.append(inline(name))
else:
@@ -378,21 +366,23 @@ class Core:
await ctx.send(_(formed))
if failed_packages:
fmt = ("Failed to load package{plural} {packs}. Check your console or "
"logs for details.")
fmt = (
"Failed to load package{plural} {packs}. Check your console or "
"logs for details."
)
formed = self.get_package_strings(failed_packages, fmt)
await ctx.send(_(formed))
if notfound_packages:
fmt = 'The package{plural} {packs} {other} not found in any cog path.'
formed = self.get_package_strings(notfound_packages, fmt, ('was', 'were'))
fmt = "The package{plural} {packs} {other} not found in any cog path."
formed = self.get_package_strings(notfound_packages, fmt, ("was", "were"))
await ctx.send(_(formed))
@commands.group()
@checks.is_owner()
async def unload(self, ctx, *, cog_name: str):
"""Unloads packages"""
cognames = [c.strip() for c in cog_name.split(' ')]
cognames = [c.strip() for c in cog_name.split(" ")]
failed_packages = []
unloaded_packages = []
@@ -406,12 +396,12 @@ class Core:
if unloaded_packages:
fmt = "Package{plural} {packs} {other} unloaded."
formed = self.get_package_strings(unloaded_packages, fmt, ('was', 'were'))
formed = self.get_package_strings(unloaded_packages, fmt, ("was", "were"))
await ctx.send(_(formed))
if failed_packages:
fmt = "The package{plural} {packs} {other} not loaded."
formed = self.get_package_strings(failed_packages, fmt, ('is', 'are'))
formed = self.get_package_strings(failed_packages, fmt, ("is", "are"))
await ctx.send(_(formed))
@commands.command(name="reload")
@@ -419,7 +409,7 @@ class Core:
async def _reload(self, ctx, *, cog_name: str):
"""Reloads packages"""
cognames = [c.strip() for c in cog_name.split(' ')]
cognames = [c.strip() for c in cog_name.split(" ")]
for c in cognames:
ctx.bot.unload_extension(c)
@@ -444,50 +434,46 @@ class Core:
except Exception as e:
log.exception("Package reloading failed", exc_info=e)
exception_log = ("Exception in command '{}'\n"
"".format(ctx.command.qualified_name))
exception_log += "".join(traceback.format_exception(type(e),
e, e.__traceback__))
exception_log = (
"Exception in command '{}'\n" "".format(ctx.command.qualified_name)
)
exception_log += "".join(traceback.format_exception(type(e), e, e.__traceback__))
self.bot._last_exception = exception_log
failed_packages.append(inline(name))
if loaded_packages:
fmt = "Package{plural} {packs} {other} reloaded."
formed = self.get_package_strings(loaded_packages, fmt, ('was', 'were'))
formed = self.get_package_strings(loaded_packages, fmt, ("was", "were"))
await ctx.send(_(formed))
if failed_packages:
fmt = ("Failed to reload package{plural} {packs}. Check your "
"logs for details")
fmt = ("Failed to reload package{plural} {packs}. Check your " "logs for details")
formed = self.get_package_strings(failed_packages, fmt)
await ctx.send(_(formed))
if notfound_packages:
fmt = 'The package{plural} {packs} {other} not found in any cog path.'
formed = self.get_package_strings(notfound_packages, fmt, ('was', 'were'))
fmt = "The package{plural} {packs} {other} not found in any cog path."
formed = self.get_package_strings(notfound_packages, fmt, ("was", "were"))
await ctx.send(_(formed))
def get_package_strings(self, packages: list, fmt: str, other: tuple=None):
def get_package_strings(self, packages: list, fmt: str, other: tuple = None):
"""
Gets the strings needed for the load, unload and reload commands
"""
if other is None:
other = ('', '')
plural = 's' if len(packages) > 1 else ''
use_and, other = ('', other[0]) if len(packages) == 1 else (' and ', other[1])
packages_string = ', '.join(packages[:-1]) + use_and + packages[-1]
other = ("", "")
plural = "s" if len(packages) > 1 else ""
use_and, other = ("", other[0]) if len(packages) == 1 else (" and ", other[1])
packages_string = ", ".join(packages[:-1]) + use_and + packages[-1]
form = {'plural': plural,
'packs' : packages_string,
'other' : other
}
form = {"plural": plural, "packs": packages_string, "other": other}
final_string = fmt.format(**form)
return final_string
@commands.command(name="shutdown")
@checks.is_owner()
async def _shutdown(self, ctx, silently: bool=False):
async def _shutdown(self, ctx, silently: bool = False):
"""Shuts down the bot"""
wave = "\N{WAVING HAND SIGN}"
skin = "\N{EMOJI MODIFIER FITZPATRICK TYPE-3}"
@@ -500,7 +486,7 @@ class Core:
@commands.command(name="restart")
@checks.is_owner()
async def _restart(self, ctx, silently: bool=False):
async def _restart(self, ctx, silently: bool = False):
"""Attempts to restart Red
Makes Red quit with exit code 26
@@ -515,7 +501,7 @@ class Core:
def cleanup_and_refresh_modules(self, module_name: str):
"""Interally reloads modules so that changes are detected"""
splitted = module_name.split('.')
splitted = module_name.split(".")
def maybe_reload(new_name):
try:
@@ -553,9 +539,11 @@ class Core:
"Mod role: {}\n"
"Locale: {}"
"".format(
ctx.bot.user.name, " ".join(prefixes),
ctx.bot.user.name,
" ".join(prefixes),
admin_role.name if admin_role else "Not set",
mod_role.name if mod_role else "Not set", locale
mod_role.name if mod_role else "Not set",
locale,
)
)
await ctx.send(box(settings))
@@ -588,9 +576,13 @@ class Core:
try:
await ctx.bot.user.edit(avatar=data)
except discord.HTTPException:
await ctx.send(_("Failed. Remember that you can edit my avatar "
"up to two times a hour. The URL must be a "
"direct link to a JPG / PNG."))
await ctx.send(
_(
"Failed. Remember that you can edit my avatar "
"up to two times a hour. The URL must be a "
"direct link to a JPG / PNG."
)
)
except discord.InvalidArgument:
await ctx.send(_("JPG / PNG format only."))
else:
@@ -599,26 +591,24 @@ class Core:
@_set.command(name="game")
@checks.bot_in_a_guild()
@checks.is_owner()
async def _game(self, ctx, *, game: str=None):
async def _game(self, ctx, *, game: str = None):
"""Sets Red's playing status"""
if game:
game = discord.Game(name=game)
else:
game = None
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 \
else discord.Status.online
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 else discord.Status.online
await ctx.bot.change_presence(status=status, activity=game)
await ctx.send(_("Game set."))
@_set.command(name="listening")
@checks.bot_in_a_guild()
@checks.is_owner()
async def _listening(self, ctx, *, listening: str=None):
async def _listening(self, ctx, *, listening: str = None):
"""Sets Red's listening status"""
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 \
else discord.Status.online
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 else discord.Status.online
if listening:
activity = discord.Activity(name=listening, type=discord.ActivityType.listening)
else:
@@ -629,11 +619,10 @@ class Core:
@_set.command(name="watching")
@checks.bot_in_a_guild()
@checks.is_owner()
async def _watching(self, ctx, *, watching: str=None):
async def _watching(self, ctx, *, watching: str = None):
"""Sets Red's watching status"""
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 \
else discord.Status.online
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 else discord.Status.online
if watching:
activity = discord.Activity(name=watching, type=discord.ActivityType.watching)
else:
@@ -658,7 +647,7 @@ class Core:
"online": discord.Status.online,
"idle": discord.Status.idle,
"dnd": discord.Status.dnd,
"invisible": discord.Status.invisible
"invisible": discord.Status.invisible,
}
game = ctx.bot.guilds[0].me.activity if len(ctx.bot.guilds) > 0 else None
@@ -677,8 +666,7 @@ class Core:
"""Sets Red's streaming status
Leaving both streamer and stream_title empty will clear it."""
status = ctx.bot.guilds[0].me.status \
if len(ctx.bot.guilds) > 0 else None
status = ctx.bot.guilds[0].me.status if len(ctx.bot.guilds) > 0 else None
if stream_title:
stream_title = stream_title.strip()
@@ -700,23 +688,28 @@ class Core:
try:
await ctx.bot.user.edit(username=username)
except discord.HTTPException:
await ctx.send(_("Failed to change name. Remember that you can "
"only do it up to 2 times an hour. Use "
"nicknames if you need frequent changes. "
"`{}set nickname`").format(ctx.prefix))
await ctx.send(
_(
"Failed to change name. Remember that you can "
"only do it up to 2 times an hour. Use "
"nicknames if you need frequent changes. "
"`{}set nickname`"
).format(
ctx.prefix
)
)
else:
await ctx.send(_("Done."))
@_set.command(name="nickname")
@checks.admin()
@commands.guild_only()
async def _nickname(self, ctx, *, nickname: str=None):
async def _nickname(self, ctx, *, nickname: str = None):
"""Sets Red's nickname"""
try:
await ctx.guild.me.edit(nick=nickname)
except discord.Forbidden:
await ctx.send(_("I lack the permissions to change my own "
"nickname."))
await ctx.send(_("I lack the permissions to change my own " "nickname."))
else:
await ctx.send("Done.")
@@ -748,6 +741,7 @@ class Core:
@commands.cooldown(1, 60 * 10, commands.BucketType.default)
async def owner(self, ctx):
"""Sets Red's main owner"""
def check(m):
return m.author == ctx.author and m.channel == ctx.channel
@@ -759,20 +753,22 @@ class Core:
for i in range(length):
token += random.choice(chars)
log.info("{0} ({0.id}) requested to be set as owner."
"".format(ctx.author))
log.info("{0} ({0.id}) requested to be set as owner." "".format(ctx.author))
print(_("\nVerification token:"))
print(token)
await ctx.send(_("Remember:\n") + OWNER_DISCLAIMER)
await asyncio.sleep(5)
await ctx.send(_("I have printed a one-time token in the console. "
"Copy and paste it here to confirm you are the owner."))
await ctx.send(
_(
"I have printed a one-time token in the console. "
"Copy and paste it here to confirm you are the owner."
)
)
try:
message = await ctx.bot.wait_for("message", check=check,
timeout=60)
message = await ctx.bot.wait_for("message", check=check, timeout=60)
except asyncio.TimeoutError:
self.owner.reset_cooldown(ctx)
await ctx.send(_("The set owner request has timed out."))
@@ -798,10 +794,15 @@ class Core:
pass
await ctx.send(
_("Please use that command in DM. Since users probably saw your token,"
" it is recommended to reset it right now. Go to the following link and"
" select `Reveal Token` and `Generate a new token?`."
"\n\nhttps://discordapp.com/developers/applications/me/{}").format(self.bot.user.id))
_(
"Please use that command in DM. Since users probably saw your token,"
" it is recommended to reset it right now. Go to the following link and"
" select `Reveal Token` and `Generate a new token?`."
"\n\nhttps://discordapp.com/developers/applications/me/{}"
).format(
self.bot.user.id
)
)
return
await ctx.bot.db.token.set(token)
@@ -854,9 +855,7 @@ class Core:
locale_list = sorted(set([loc.stem for loc in list(red_path.glob("**/*.po"))]))
pages = pagify("\n".join(locale_list))
await ctx.send_interactive(
pages, box_lang="Available Locales:"
)
await ctx.send_interactive(pages, box_lang="Available Locales:")
@commands.command()
@checks.is_owner()
@@ -864,9 +863,11 @@ class Core:
"""Creates a backup of all data for the instance."""
from redbot.core.data_manager import basic_config, instance_name
from redbot.core.drivers.red_json import JSON
data_dir = Path(basic_config["DATA_PATH"])
if basic_config["STORAGE_TYPE"] == "MongoDB":
from redbot.core.drivers.red_mongo import Mongo
m = Mongo("Core", **basic_config["STORAGE_DETAILS"])
db = m.db
collection_names = await db.collection_names(include_system_collections=False)
@@ -891,9 +892,9 @@ class Core:
os.chdir(str(data_dir.parent))
with tarfile.open(str(backup_file), "w:gz") as tar:
tar.add(data_dir.stem)
await ctx.send(_("A backup has been made of this instance. It is at {}.").format(
backup_file
))
await ctx.send(
_("A backup has been made of this instance. It is at {}.").format(backup_file)
)
else:
await ctx.send(_("That directory doesn't seem to exist..."))
@@ -902,8 +903,7 @@ class Core:
async def contact(self, ctx, *, message: str):
"""Sends a message to the owner"""
guild = ctx.message.guild
owner = discord.utils.get(ctx.bot.get_all_members(),
id=ctx.bot.owner_id)
owner = discord.utils.get(ctx.bot.get_all_members(), id=ctx.bot.owner_id)
author = ctx.message.author
footer = _("User ID: {}").format(author.id)
@@ -916,12 +916,11 @@ class Core:
# We need to grab the DM command prefix (global)
# Since it can also be set through cli flags, bot.db is not a reliable
# source. So we'll just mock a DM message instead.
fake_message = namedtuple('Message', 'guild')
fake_message = namedtuple("Message", "guild")
prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None))
prefix = prefixes[0]
content = _("Use `{}dm {} <text>` to reply to this user"
"").format(prefix, author.id)
content = _("Use `{}dm {} <text>` to reply to this user" "").format(prefix, author.id)
description = _("Sent by {} {}").format(author, source)
@@ -941,21 +940,21 @@ class Core:
try:
await owner.send(content, embed=e)
except discord.InvalidArgument:
await ctx.send(_("I cannot send your message, I'm unable to find "
"my owner... *sigh*"))
await ctx.send(
_("I cannot send your message, I'm unable to find " "my owner... *sigh*")
)
except:
await ctx.send(_("I'm unable to deliver your message. Sorry."))
else:
await ctx.send(_("Your message has been sent."))
else:
msg_text = (
"{}\nMessage:\n\n{}\n{}".format(description, message, footer)
)
msg_text = ("{}\nMessage:\n\n{}\n{}".format(description, message, footer))
try:
await owner.send("{}\n{}".format(content, box(msg_text)))
except discord.InvalidArgument:
await ctx.send(_("I cannot send your message, I'm unable to find "
"my owner... *sigh*"))
await ctx.send(
_("I cannot send your message, I'm unable to find " "my owner... *sigh*")
)
except:
await ctx.send(_("I'm unable to deliver your message. Sorry."))
else:
@@ -970,15 +969,18 @@ class Core:
To get a user id enable 'developer mode' in Discord's
settings, 'appearance' tab. Then right click a user
and copy their id"""
destination = discord.utils.get(ctx.bot.get_all_members(),
id=user_id)
destination = discord.utils.get(ctx.bot.get_all_members(), id=user_id)
if destination is None:
await ctx.send(_("Invalid ID or user not found. You can only "
"send messages to people I share a server "
"with."))
await ctx.send(
_(
"Invalid ID or user not found. You can only "
"send messages to people I share a server "
"with."
)
)
return
fake_message = namedtuple('Message', 'guild')
fake_message = namedtuple("Message", "guild")
prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None))
prefix = prefixes[0]
description = _("Owner of {}").format(ctx.bot.user)
@@ -995,8 +997,9 @@ class Core:
try:
await destination.send(embed=e)
except:
await ctx.send(_("Sorry, I couldn't deliver your message "
"to {}").format(destination))
await ctx.send(
_("Sorry, I couldn't deliver your message " "to {}").format(destination)
)
else:
await ctx.send(_("Message delivered to {}").format(destination))
else:
@@ -1004,8 +1007,9 @@ class Core:
try:
await destination.send("{}\n{}".format(box(response), content))
except:
await ctx.send(_("Sorry, I couldn't deliver your message "
"to {}").format(destination))
await ctx.send(
_("Sorry, I couldn't deliver your message " "to {}").format(destination)
)
else:
await ctx.send(_("Message delivered to {}").format(destination))
@@ -1018,7 +1022,7 @@ class Core:
if ctx.invoked_subcommand is None:
await ctx.send_help()
@whitelist.command(name='add')
@whitelist.command(name="add")
async def whitelist_add(self, ctx, user: discord.User):
"""
Adds a user to the whitelist.
@@ -1029,7 +1033,7 @@ class Core:
await ctx.send(_("User added to whitelist."))
@whitelist.command(name='list')
@whitelist.command(name="list")
async def whitelist_list(self, ctx):
"""
Lists whitelisted users.
@@ -1043,7 +1047,7 @@ class Core:
for page in pagify(msg):
await ctx.send(box(page))
@whitelist.command(name='remove')
@whitelist.command(name="remove")
async def whitelist_remove(self, ctx, user: discord.User):
"""
Removes user from whitelist.
@@ -1060,7 +1064,7 @@ class Core:
else:
await ctx.send(_("User was not in the whitelist."))
@whitelist.command(name='clear')
@whitelist.command(name="clear")
async def whitelist_clear(self, ctx):
"""
Clears the whitelist.
@@ -1077,7 +1081,7 @@ class Core:
if ctx.invoked_subcommand is None:
await ctx.send_help()
@blacklist.command(name='add')
@blacklist.command(name="add")
async def blacklist_add(self, ctx, user: discord.User):
"""
Adds a user to the blacklist.
@@ -1092,7 +1096,7 @@ class Core:
await ctx.send(_("User added to blacklist."))
@blacklist.command(name='list')
@blacklist.command(name="list")
async def blacklist_list(self, ctx):
"""
Lists blacklisted users.
@@ -1106,7 +1110,7 @@ class Core:
for page in pagify(msg):
await ctx.send(box(page))
@blacklist.command(name='remove')
@blacklist.command(name="remove")
async def blacklist_remove(self, ctx, user: discord.User):
"""
Removes user from blacklist.
@@ -1123,7 +1127,7 @@ class Core:
else:
await ctx.send(_("User was not in the blacklist."))
@blacklist.command(name='clear')
@blacklist.command(name="clear")
async def blacklist_clear(self, ctx):
"""
Clears the blacklist.

View File

@@ -14,9 +14,15 @@ from .utils import TYPE_CHECKING
if TYPE_CHECKING:
from . import Config
__all__ = ['load_basic_configuration', 'cog_data_path', 'core_data_path',
'load_bundled_data', 'bundled_data_path', 'storage_details',
'storage_type']
__all__ = [
"load_basic_configuration",
"cog_data_path",
"core_data_path",
"load_bundled_data",
"bundled_data_path",
"storage_details",
"storage_type",
]
log = logging.getLogger("red.data_manager")
@@ -25,20 +31,16 @@ basic_config = None
instance_name = None
basic_config_default = {
"DATA_PATH": None,
"COG_PATH_APPEND": "cogs",
"CORE_PATH_APPEND": "core"
}
basic_config_default = {"DATA_PATH": None, "COG_PATH_APPEND": "cogs", "CORE_PATH_APPEND": "core"}
config_dir = None
appdir = appdirs.AppDirs("Red-DiscordBot")
if sys.platform == 'linux':
if sys.platform == "linux":
if 0 < os.getuid() < 1000:
config_dir = Path(appdir.site_data_dir)
if not config_dir:
config_dir = Path(appdir.user_config_dir)
config_file = config_dir / 'config.json'
config_file = config_dir / "config.json"
def load_basic_configuration(instance_name_: str):
@@ -67,20 +69,23 @@ def load_basic_configuration(instance_name_: str):
config = jsonio._load_json()
basic_config = config[instance_name]
except (FileNotFoundError, KeyError):
print("You need to configure the bot instance using `redbot-setup`"
" prior to running the bot.")
print(
"You need to configure the bot instance using `redbot-setup`"
" prior to running the bot."
)
sys.exit(1)
def _base_data_path() -> Path:
if basic_config is None:
raise RuntimeError("You must load the basic config before you"
" can get the base data path.")
path = basic_config['DATA_PATH']
raise RuntimeError(
"You must load the basic config before you" " can get the base data path."
)
path = basic_config["DATA_PATH"]
return Path(path).resolve()
def cog_data_path(cog_instance=None, raw_name: str=None) -> Path:
def cog_data_path(cog_instance=None, raw_name: str = None) -> Path:
"""Gets the base cog data path. If you want to get the folder with
which to store your own cog's data please pass in an instance
of your cog class.
@@ -104,9 +109,10 @@ def cog_data_path(cog_instance=None, raw_name: str=None) -> Path:
try:
base_data_path = Path(_base_data_path())
except RuntimeError as e:
raise RuntimeError("You must load the basic config before you"
" can get the cog data path.") from e
cog_path = base_data_path / basic_config['COG_PATH_APPEND']
raise RuntimeError(
"You must load the basic config before you" " can get the cog data path."
) from e
cog_path = base_data_path / basic_config["COG_PATH_APPEND"]
if raw_name is not None:
cog_path = cog_path / raw_name
@@ -121,9 +127,10 @@ def core_data_path() -> Path:
try:
base_data_path = Path(_base_data_path())
except RuntimeError as e:
raise RuntimeError("You must load the basic config before you"
" can get the core data path.") from e
core_path = base_data_path / basic_config['CORE_PATH_APPEND']
raise RuntimeError(
"You must load the basic config before you" " can get the core data path."
) from e
core_path = base_data_path / basic_config["CORE_PATH_APPEND"]
core_path.mkdir(exist_ok=True, parents=True)
return core_path.resolve()
@@ -145,15 +152,13 @@ def _find_data_files(init_location: str) -> (Path, List[Path]):
if not init_file.is_file():
return []
package_folder = init_file.parent.resolve() / 'data'
package_folder = init_file.parent.resolve() / "data"
if not package_folder.is_dir():
return []
all_files = list(package_folder.rglob("*"))
return package_folder, [p.resolve()
for p in all_files
if p.is_file()]
return package_folder, [p.resolve() for p in all_files if p.is_file()]
def _compare_and_copy(to_copy: List[Path], bundled_data_dir: Path, cog_data_dir: Path):
@@ -181,27 +186,24 @@ def _compare_and_copy(to_copy: List[Path], bundled_data_dir: Path, cog_data_dir:
yield block
block = afile.read(blocksize)
lookup = {p: cog_data_dir.joinpath(p.relative_to(bundled_data_dir))
for p in to_copy}
lookup = {p: cog_data_dir.joinpath(p.relative_to(bundled_data_dir)) for p in to_copy}
for orig, poss_existing in lookup.items():
if not poss_existing.is_file():
poss_existing.parent.mkdir(exist_ok=True, parents=True)
exists_checksum = None
else:
exists_checksum = hash_bytestr_iter(file_as_blockiter(
poss_existing.open('rb')), hashlib.sha256())
exists_checksum = hash_bytestr_iter(
file_as_blockiter(poss_existing.open("rb")), hashlib.sha256()
)
orig_checksum = ...
if exists_checksum is not None:
orig_checksum = hash_bytestr_iter(file_as_blockiter(
orig.open('rb')), hashlib.sha256())
orig_checksum = hash_bytestr_iter(file_as_blockiter(orig.open("rb")), hashlib.sha256())
if exists_checksum != orig_checksum:
shutil.copy(str(orig), str(poss_existing))
log.debug("Copying {} to {}".format(
orig, poss_existing
))
log.debug("Copying {} to {}".format(orig, poss_existing))
def load_bundled_data(cog_instance, init_location: str):
@@ -233,7 +235,7 @@ def load_bundled_data(cog_instance, init_location: str):
"""
bundled_data_folder, to_copy = _find_data_files(init_location)
cog_data_folder = cog_data_path(cog_instance) / 'bundled_data'
cog_data_folder = cog_data_path(cog_instance) / "bundled_data"
_compare_and_copy(to_copy, bundled_data_folder, cog_data_folder)
@@ -264,12 +266,10 @@ def bundled_data_path(cog_instance) -> Path:
If no bundled data folder exists or if it hasn't been loaded yet.
"""
bundled_path = cog_data_path(cog_instance) / 'bundled_data'
bundled_path = cog_data_path(cog_instance) / "bundled_data"
if not bundled_path.is_dir():
raise FileNotFoundError("No such directory {}".format(
bundled_path
))
raise FileNotFoundError("No such directory {}".format(bundled_path))
return bundled_path
@@ -282,9 +282,9 @@ def storage_type() -> str:
str
"""
try:
return basic_config['STORAGE_TYPE']
return basic_config["STORAGE_TYPE"]
except KeyError as e:
raise RuntimeError('Bot basic config has not been loaded yet.') from e
raise RuntimeError("Bot basic config has not been loaded yet.") from e
def storage_details() -> dict:
@@ -297,6 +297,6 @@ def storage_details() -> dict:
dict
"""
try:
return basic_config['STORAGE_DETAILS']
return basic_config["STORAGE_DETAILS"]
except KeyError as e:
raise RuntimeError('Bot basic config has not been loaded yet.') from e
raise RuntimeError("Bot basic config has not been loaded yet.") from e

View File

@@ -10,6 +10,7 @@ import discord
from . import checks, commands
from .i18n import Translator
from .utils.chat_formatting import box, pagify
"""
Notice:
@@ -32,11 +33,11 @@ class Dev:
def cleanup_code(content):
"""Automatically removes code blocks from the code."""
# remove ```py\n```
if content.startswith('```') and content.endswith('```'):
return '\n'.join(content.split('\n')[1:-1])
if content.startswith("```") and content.endswith("```"):
return "\n".join(content.split("\n")[1:-1])
# remove `foo`
return content.strip('` \n')
return content.strip("` \n")
@staticmethod
def get_syntax_error(e):
@@ -45,11 +46,10 @@ class Dev:
Returns a string representation of the error formatted as a codeblock.
"""
if e.text is None:
return box('{0.__class__.__name__}: {0}'.format(e), lang="py")
return box("{0.__class__.__name__}: {0}".format(e), lang="py")
return box(
'{0.text}{1:>{0.offset}}\n{2}: {0}'
''.format(e, '^', type(e).__name__),
lang="py")
"{0.text}{1:>{0.offset}}\n{2}: {0}" "".format(e, "^", type(e).__name__), lang="py"
)
@staticmethod
def get_pages(msg: str):
@@ -90,15 +90,15 @@ class Dev:
_ - The result of the last dev command.
"""
env = {
'bot': ctx.bot,
'ctx': ctx,
'channel': ctx.channel,
'author': ctx.author,
'guild': ctx.guild,
'message': ctx.message,
'discord': discord,
'commands': commands,
'_': self._last_result
"bot": ctx.bot,
"ctx": ctx,
"channel": ctx.channel,
"author": ctx.author,
"guild": ctx.guild,
"message": ctx.message,
"discord": discord,
"commands": commands,
"_": self._last_result,
}
code = self.cleanup_code(code)
@@ -109,8 +109,7 @@ class Dev:
await ctx.send(self.get_syntax_error(e))
return
except Exception as e:
await ctx.send(
box('{}: {!s}'.format(type(e).__name__, e), lang='py'))
await ctx.send(box("{}: {!s}".format(type(e).__name__, e), lang="py"))
return
if asyncio.iscoroutine(result):
@@ -122,7 +121,7 @@ class Dev:
await ctx.send_interactive(self.get_pages(result), box_lang="py")
@commands.command(name='eval')
@commands.command(name="eval")
@checks.is_owner()
async def _eval(self, ctx, *, body: str):
"""Execute asynchronous code.
@@ -145,28 +144,28 @@ class Dev:
_ - The result of the last dev command.
"""
env = {
'bot': ctx.bot,
'ctx': ctx,
'channel': ctx.channel,
'author': ctx.author,
'guild': ctx.guild,
'message': ctx.message,
'discord': discord,
'commands': commands,
'_': self._last_result
"bot": ctx.bot,
"ctx": ctx,
"channel": ctx.channel,
"author": ctx.author,
"guild": ctx.guild,
"message": ctx.message,
"discord": discord,
"commands": commands,
"_": self._last_result,
}
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = 'async def func():\n%s' % textwrap.indent(body, ' ')
to_compile = "async def func():\n%s" % textwrap.indent(body, " ")
try:
exec(to_compile, env)
except SyntaxError as e:
return await ctx.send(self.get_syntax_error(e))
func = env['func']
func = env["func"]
result = None
try:
with redirect_stdout(stdout):
@@ -199,43 +198,43 @@ class Dev:
async function.
"""
variables = {
'ctx': ctx,
'bot': ctx.bot,
'message': ctx.message,
'guild': ctx.guild,
'channel': ctx.channel,
'author': ctx.author,
'_': None,
"ctx": ctx,
"bot": ctx.bot,
"message": ctx.message,
"guild": ctx.guild,
"channel": ctx.channel,
"author": ctx.author,
"_": None,
}
if ctx.channel.id in self.sessions:
await ctx.send(_('Already running a REPL session in this channel. '
'Exit it with `quit`.'))
await ctx.send(
_("Already running a REPL session in this channel. " "Exit it with `quit`.")
)
return
self.sessions.add(ctx.channel.id)
await ctx.send(_('Enter code to execute or evaluate.'
' `exit()` or `quit` to exit.'))
await ctx.send(_("Enter code to execute or evaluate." " `exit()` or `quit` to exit."))
msg_check = lambda m: (m.author == ctx.author and
m.channel == ctx.channel and
m.content.startswith('`'))
msg_check = lambda m: (
m.author == ctx.author and m.channel == ctx.channel and m.content.startswith("`")
)
while True:
response = await ctx.bot.wait_for("message", check=msg_check)
cleaned = self.cleanup_code(response.content)
if cleaned in ('quit', 'exit', 'exit()'):
await ctx.send('Exiting.')
if cleaned in ("quit", "exit", "exit()"):
await ctx.send("Exiting.")
self.sessions.remove(ctx.channel.id)
return
executor = exec
if cleaned.count('\n') == 0:
if cleaned.count("\n") == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, '<repl session>', 'eval')
code = compile(cleaned, "<repl session>", "eval")
except SyntaxError:
pass
else:
@@ -243,12 +242,12 @@ class Dev:
if executor is exec:
try:
code = compile(cleaned, '<repl session>', 'exec')
code = compile(cleaned, "<repl session>", "exec")
except SyntaxError as e:
await ctx.send(self.get_syntax_error(e))
continue
variables['message'] = response
variables["message"] = response
stdout = io.StringIO()
@@ -266,7 +265,7 @@ class Dev:
value = stdout.getvalue()
if result is not None:
msg = "{}{}".format(value, result)
variables['_'] = result
variables["_"] = result
elif value:
msg = "{}".format(value)
@@ -277,7 +276,7 @@ class Dev:
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send(_('Unexpected error: `{}`').format(e))
await ctx.send(_("Unexpected error: `{}`").format(e))
@commands.command()
@checks.is_owner()
@@ -290,7 +289,7 @@ class Dev:
msg.author = user
msg.content = ctx.prefix + command
ctx.bot.dispatch('message', msg)
ctx.bot.dispatch("message", msg)
@commands.command(name="mockmsg")
@checks.is_owner()

View File

@@ -24,8 +24,10 @@ def get_driver(type, *args, **kwargs):
"""
if type == "JSON":
from .red_json import JSON
return JSON(*args, **kwargs)
elif type == "MongoDB":
from .red_mongo import Mongo
return Mongo(*args, **kwargs)
raise RuntimeError("Invalid driver type: '{}'".format(type))

View File

@@ -2,6 +2,7 @@ __all__ = ["BaseDriver"]
class BaseDriver:
def __init__(self, cog_name, identifier):
self.cog_name = cog_name
self.unique_cog_identifier = identifier

View File

@@ -44,14 +44,21 @@ class JSON(BaseDriver):
The path in which to store the file indicated by :py:attr:`file_name`.
"""
def __init__(self, cog_name, identifier, *, data_path_override: Path=None,
file_name_override: str="settings.json"):
def __init__(
self,
cog_name,
identifier,
*,
data_path_override: Path = None,
file_name_override: str = "settings.json"
):
super().__init__(cog_name, identifier)
self.file_name = file_name_override
if data_path_override:
self.data_path = data_path_override
else:
self.data_path = Path.cwd() / 'cogs' / '.data' / self.cog_name
self.data_path = Path.cwd() / "cogs" / ".data" / self.cog_name
self.data_path.mkdir(parents=True, exist_ok=True)

View File

@@ -8,21 +8,16 @@ _conn = None
def _initialize(**kwargs):
host = kwargs['HOST']
port = kwargs['PORT']
admin_user = kwargs['USERNAME']
admin_pass = kwargs['PASSWORD']
db_name = kwargs.get('DB_NAME', 'default_db')
host = kwargs["HOST"]
port = kwargs["PORT"]
admin_user = kwargs["USERNAME"]
admin_pass = kwargs["PASSWORD"]
db_name = kwargs.get("DB_NAME", "default_db")
if admin_user is not None and admin_pass is not None:
url = "mongodb://{}:{}@{}:{}/{}".format(
admin_user, admin_pass, host, port,
db_name
)
url = "mongodb://{}:{}@{}:{}/{}".format(admin_user, admin_pass, host, port, db_name)
else:
url = "mongodb://{}:{}/{}".format(
host, port, db_name
)
url = "mongodb://{}:{}/{}".format(host, port, db_name)
global _conn
_conn = motor.motor_asyncio.AsyncIOMotorClient(url)
@@ -32,6 +27,7 @@ class Mongo(BaseDriver):
"""
Subclass of :py:class:`.red_base.BaseDriver`.
"""
def __init__(self, cog_name, identifier, **kwargs):
super().__init__(cog_name, identifier)
@@ -75,45 +71,40 @@ class Mongo(BaseDriver):
async def get(self, *identifiers: str):
mongo_collection = self.get_collection()
dot_identifiers = '.'.join(identifiers)
dot_identifiers = ".".join(identifiers)
partial = await mongo_collection.find_one(
filter={'_id': self.unique_cog_identifier},
projection={dot_identifiers: True}
filter={"_id": self.unique_cog_identifier}, projection={dot_identifiers: True}
)
if partial is None:
raise KeyError("No matching document was found and Config expects"
" a KeyError.")
raise KeyError("No matching document was found and Config expects" " a KeyError.")
for i in identifiers:
partial = partial[i]
return partial
async def set(self, *identifiers: str, value=None):
dot_identifiers = '.'.join(identifiers)
dot_identifiers = ".".join(identifiers)
mongo_collection = self.get_collection()
await mongo_collection.update_one(
{'_id': self.unique_cog_identifier},
{"_id": self.unique_cog_identifier},
update={"$set": {dot_identifiers: value}},
upsert=True
upsert=True,
)
async def clear(self, *identifiers: str):
dot_identifiers = '.'.join(identifiers)
dot_identifiers = ".".join(identifiers)
mongo_collection = self.get_collection()
if len(identifiers) > 0:
await mongo_collection.update_one(
{'_id': self.unique_cog_identifier},
update={"$unset": {dot_identifiers: 1}}
{"_id": self.unique_cog_identifier}, update={"$unset": {dot_identifiers: 1}}
)
else:
await mongo_collection.delete_one(
{'_id': self.unique_cog_identifier}
)
await mongo_collection.delete_one({"_id": self.unique_cog_identifier})
def get_config_details():
@@ -129,10 +120,10 @@ def get_config_details():
admin_uname = admin_password = None
ret = {
'HOST': host,
'PORT': port,
'USERNAME': admin_uname,
'PASSWORD': admin_password,
'DB_NAME': db_name
"HOST": host,
"PORT": port,
"USERNAME": admin_uname,
"PASSWORD": admin_password,
"DB_NAME": db_name,
}
return ret

View File

@@ -44,8 +44,8 @@ def should_log_sentry(exception) -> bool:
tb_frame = tb.tb_frame
tb = tb.tb_next
module = tb_frame.f_globals.get('__name__')
return module.startswith('redbot')
module = tb_frame.f_globals.get("__name__")
return module.startswith("redbot")
def init_events(bot, cli_flags):
@@ -77,8 +77,7 @@ def init_events(bot, cli_flags):
spec = await bot.cog_mgr.find_cog(package)
await bot.load_extension(spec)
except Exception as e:
log.exception("Failed to load package {}".format(package),
exc_info=e)
log.exception("Failed to load package {}".format(package), exc_info=e)
await bot.remove_loaded_package(package)
to_remove.append(package)
for package in to_remove:
@@ -104,18 +103,21 @@ def init_events(bot, cli_flags):
red_pkg = pkg_resources.get_distribution("Red-DiscordBot")
dpy_version = discord.__version__
INFO = [str(bot.user), "Prefixes: {}".format(', '.join(prefixes)),
'Language: {}'.format(lang),
"Red Bot Version: {}".format(red_version),
"Discord.py Version: {}".format(dpy_version),
"Shards: {}".format(bot.shard_count)]
INFO = [
str(bot.user),
"Prefixes: {}".format(", ".join(prefixes)),
"Language: {}".format(lang),
"Red Bot Version: {}".format(red_version),
"Discord.py Version: {}".format(dpy_version),
"Shards: {}".format(bot.shard_count),
]
if guilds:
INFO.extend(("Servers: {}".format(guilds), "Users: {}".format(users)))
else:
print("Ready. I'm not in any server yet!")
INFO.append('{} cogs with {} commands'.format(len(bot.cogs), len(bot.commands)))
INFO.append("{} cogs with {} commands".format(len(bot.cogs), len(bot.commands)))
async with aiohttp.ClientSession() as session:
async with session.get("https://pypi.python.org/pypi/red-discordbot/json") as r:
@@ -139,11 +141,7 @@ def init_events(bot, cli_flags):
sentry = await bot.db.enable_sentry()
mongo_enabled = storage_type() != "JSON"
reqs_installed = {
"voice": None,
"docs": None,
"test": None
}
reqs_installed = {"voice": None, "docs": None, "test": None}
for key in reqs_installed.keys():
reqs = [x.name for x in red_pkg._dep_map[key]]
try:
@@ -158,7 +156,7 @@ def init_events(bot, cli_flags):
("MongoDB", mongo_enabled),
("Voice", reqs_installed["voice"]),
("Docs", reqs_installed["docs"]),
("Tests", reqs_installed["test"])
("Tests", reqs_installed["test"]),
)
on_symbol, off_symbol, ascii_border = _get_startup_screen_specs()
@@ -201,21 +199,25 @@ def init_events(bot, cli_flags):
await ctx.send(msg)
return
"""
log.exception("Exception in command '{}'"
"".format(ctx.command.qualified_name),
exc_info=error.original)
log.exception(
"Exception in command '{}'" "".format(ctx.command.qualified_name),
exc_info=error.original,
)
if should_log_sentry(error):
sentry_log.exception("Exception in command '{}'"
"".format(ctx.command.qualified_name),
exc_info=error.original)
sentry_log.exception(
"Exception in command '{}'" "".format(ctx.command.qualified_name),
exc_info=error.original,
)
message = ("Error in command '{}'. Check your console or "
"logs for details."
"".format(ctx.command.qualified_name))
exception_log = ("Exception in command '{}'\n"
"".format(ctx.command.qualified_name))
exception_log += "".join(traceback.format_exception(type(error),
error, error.__traceback__))
message = (
"Error in command '{}'. Check your console or "
"logs for details."
"".format(ctx.command.qualified_name)
)
exception_log = ("Exception in command '{}'\n" "".format(ctx.command.qualified_name))
exception_log += "".join(
traceback.format_exception(type(error), error, error.__traceback__)
)
bot._last_exception = exception_log
if not hasattr(ctx.cog, "_{0.command.cog_name}__error".format(ctx)):
await ctx.send(inline(message))
@@ -226,9 +228,9 @@ def init_events(bot, cli_flags):
elif isinstance(error, commands.NoPrivateMessage):
await ctx.send("That command is not available in DMs.")
elif isinstance(error, commands.CommandOnCooldown):
await ctx.send("This command is on cooldown. "
"Try again in {:.2f}s"
"".format(error.retry_after))
await ctx.send(
"This command is on cooldown. " "Try again in {:.2f}s" "".format(error.retry_after)
)
else:
log.exception(type(error).__name__, exc_info=error)
try:
@@ -237,8 +239,7 @@ def init_events(bot, cli_flags):
sentry_error = error
if should_log_sentry(sentry_error):
sentry_log.exception("Unhandled command error.",
exc_info=sentry_error)
sentry_log.exception("Unhandled command error.", exc_info=sentry_error)
@bot.event
async def on_message(message):
@@ -253,6 +254,7 @@ def init_events(bot, cli_flags):
async def on_command(command):
bot.counter["processed_commands"] += 1
def _get_startup_screen_specs():
"""Get specs for displaying the startup screen on stdout.
@@ -278,11 +280,10 @@ def _get_startup_screen_specs():
off_symbol = "X"
try:
encoder('┌┐└┘─│') # border symbols
encoder("┌┐└┘─│") # border symbols
except UnicodeEncodeError:
ascii_border = True
else:
ascii_border = False
return on_symbol, off_symbol, ascii_border

View File

@@ -38,16 +38,13 @@ import traceback
from . import commands
EMPTY_STRING = u'\u200b'
EMPTY_STRING = u"\u200b"
_mentions_transforms = {
'@everyone': '@\u200beveryone',
'@here': '@\u200bhere'
}
_mentions_transforms = {"@everyone": "@\u200beveryone", "@here": "@\u200bhere"}
_mention_pattern = re.compile('|'.join(_mentions_transforms.keys()))
_mention_pattern = re.compile("|".join(_mentions_transforms.keys()))
EmbedField = namedtuple('EmbedField', 'name value inline')
EmbedField = namedtuple("EmbedField", "name value inline")
class Help(formatter.HelpFormatter):
@@ -71,7 +68,7 @@ class Help(formatter.HelpFormatter):
@property
def avatar(self):
return self.context.bot.user.avatar_url_as(format='png')
return self.context.bot.user.avatar_url_as(format="png")
@property
def color(self):
@@ -94,48 +91,41 @@ class Help(formatter.HelpFormatter):
if self.pm_check(self.context):
name = self.context.bot.user.name
else:
name = self.me.display_name if not '' else self.context.bot.user.name
author = {
'name': '{0} Help Manual'.format(name),
'icon_url': self.avatar
}
name = self.me.display_name if not "" else self.context.bot.user.name
author = {"name": "{0} Help Manual".format(name), "icon_url": self.avatar}
return author
def _add_subcommands(self, cmds):
entries = ''
entries = ""
for name, command in cmds:
if name in command.aliases:
# skip aliases
continue
if self.is_cog() or self.is_bot():
name = '{0}{1}'.format(self.clean_prefix, name)
name = "{0}{1}".format(self.clean_prefix, name)
entries += '**{0}** {1}\n'.format(name, command.short_doc)
entries += "**{0}** {1}\n".format(name, command.short_doc)
return entries
def get_ending_note(self):
# command_name = self.context.invoked_with
return "Type {0}help <command> for more info on a command.\n" \
"You can also type {0}help <category> for more info on a category.".format(self.clean_prefix)
return "Type {0}help <command> for more info on a command.\n" "You can also type {0}help <category> for more info on a category.".format(
self.clean_prefix
)
async def format(self) -> dict:
"""Formats command for output.
Returns a dict used to build embed"""
emb = {
'embed': {
'title': '',
'description': '',
},
'footer': {
'text': self.get_ending_note()
},
'fields': []
"embed": {"title": "", "description": ""},
"footer": {"text": self.get_ending_note()},
"fields": [],
}
if self.is_cog():
translator = getattr(self.command, '__translator__', lambda s: s)
translator = getattr(self.command, "__translator__", lambda s: s)
description = (
inspect.cleandoc(translator(self.command.__doc__))
if self.command.__doc__
@@ -144,27 +134,27 @@ class Help(formatter.HelpFormatter):
else:
description = self.command.description
if not description == '' and description is not None:
description = '*{0}*'.format(description)
if not description == "" and description is not None:
description = "*{0}*".format(description)
if description:
# <description> portion
emb['embed']['description'] = description[:2046]
emb["embed"]["description"] = description[:2046]
if isinstance(self.command, discord.ext.commands.core.Command):
# <signature portion>
emb['embed']['title'] = emb['embed']['description']
emb['embed']['description'] = '`Syntax: {0}`'.format(self.get_command_signature())
emb["embed"]["title"] = emb["embed"]["description"]
emb["embed"]["description"] = "`Syntax: {0}`".format(self.get_command_signature())
# <long doc> section
if self.command.help:
splitted = self.command.help.split('\n\n')
name = '__{0}__'.format(splitted[0])
value = '\n\n'.join(splitted[1:]).replace('[p]', self.clean_prefix)
if value == '':
splitted = self.command.help.split("\n\n")
name = "__{0}__".format(splitted[0])
value = "\n\n".join(splitted[1:]).replace("[p]", self.clean_prefix)
if value == "":
value = EMPTY_STRING
field = EmbedField(name[:252], value[:1024], False)
emb['fields'].append(field)
emb["fields"].append(field)
# end it here if it's just a regular command
if not self.has_subcommands():
@@ -173,7 +163,7 @@ class Help(formatter.HelpFormatter):
def category(tup):
# Turn get cog (Category) name from cog/list tuples
cog = tup[1].cog_name
return '**__{0}:__**'.format(cog) if cog is not None else '**__\u200bNo Category:__**'
return "**__{0}:__**".format(cog) if cog is not None else "**__\u200bNo Category:__**"
# Get subcommands for bot or category
filtered = await self.filter_command_list()
@@ -185,18 +175,21 @@ class Help(formatter.HelpFormatter):
commands_ = sorted(commands_)
if len(commands_) > 0:
field = EmbedField(category, self._add_subcommands(commands_), False)
emb['fields'].append(field)
emb["fields"].append(field)
else:
# Get list of commands for category
filtered = sorted(filtered)
if filtered:
field = EmbedField(
'**__Commands:__**' if not self.is_bot() and self.is_cog() else '**__Subcommands:__**',
"**__Commands:__**"
if not self.is_bot() and self.is_cog()
else "**__Subcommands:__**",
self._add_subcommands(filtered), # May need paginated
False)
False,
)
emb['fields'].append(field)
emb["fields"].append(field)
return emb
@@ -214,7 +207,7 @@ class Help(formatter.HelpFormatter):
return ret
async def format_help_for(self, ctx, command_or_bot, reason: str=None):
async def format_help_for(self, ctx, command_or_bot, reason: str = None):
"""Formats the help page and handles the actual heavy lifting of how ### WTF HAPPENED?
the help command looks like. To change the behaviour, override the
:meth:`~.HelpFormatter.format` method.
@@ -237,16 +230,18 @@ class Help(formatter.HelpFormatter):
emb = await self.format()
if reason:
emb['embed']['title'] = "{0}".format(reason)
emb["embed"]["title"] = "{0}".format(reason)
ret = []
field_groups = self.group_fields(emb['fields'])
field_groups = self.group_fields(emb["fields"])
for i, group in enumerate(field_groups, 1):
embed = discord.Embed(color=self.color, **emb['embed'])
embed = discord.Embed(color=self.color, **emb["embed"])
if len(field_groups) > 1:
description = "{} *- Page {} of {}*".format(embed.description, i, len(field_groups))
description = "{} *- Page {} of {}*".format(
embed.description, i, len(field_groups)
)
embed.description = description
embed.set_author(**self.author)
@@ -254,7 +249,7 @@ class Help(formatter.HelpFormatter):
for field in group:
embed.add_field(**field._asdict())
embed.set_footer(**emb['footer'])
embed.set_footer(**emb["footer"])
ret.append(embed)
@@ -275,18 +270,18 @@ class Help(formatter.HelpFormatter):
embed = self.simple_embed(
ctx,
title=ctx.bot.command_not_found.format(cmd),
description='Commands are case sensitive. Please check your spelling and try again',
color=color)
description="Commands are case sensitive. Please check your spelling and try again",
color=color,
)
return embed
def cmd_has_no_subcommands(self, ctx, cmd, color=None):
embed = self.simple_embed(
ctx,
title=ctx.bot.command_has_no_subcommands.format(cmd),
color=color
ctx, title=ctx.bot.command_has_no_subcommands.format(cmd), color=color
)
return embed
@commands.command()
async def help(ctx, *cmds: str):
"""Shows help documentation.
@@ -297,7 +292,8 @@ async def help(ctx, *cmds: str):
destination = ctx.author if ctx.bot.pm_help else ctx
def repl(obj):
return _mentions_transforms.get(obj.group(0), '')
return _mentions_transforms.get(obj.group(0), "")
use_embeds = await ctx.embed_requested()
f = formatter.HelpFormatter()
# help by itself just lists our own commands.
@@ -316,12 +312,9 @@ async def help(ctx, *cmds: str):
command = ctx.bot.all_commands.get(name)
if command is None:
if use_embeds:
await destination.send(
embed=ctx.bot.formatter.cmd_not_found(ctx, name))
await destination.send(embed=ctx.bot.formatter.cmd_not_found(ctx, name))
else:
await destination.send(
ctx.bot.command_not_found.format(name)
)
await destination.send(ctx.bot.command_not_found.format(name))
return
if use_embeds:
embeds = await ctx.bot.formatter.format_help_for(ctx, command)
@@ -332,12 +325,9 @@ async def help(ctx, *cmds: str):
command = ctx.bot.all_commands.get(name)
if command is None:
if use_embeds:
await destination.send(
embed=ctx.bot.formatter.cmd_not_found(ctx, name))
await destination.send(embed=ctx.bot.formatter.cmd_not_found(ctx, name))
else:
await destination.send(
ctx.bot.command_not_found.format(name)
)
await destination.send(ctx.bot.command_not_found.format(name))
return
for key in cmds[1:]:
@@ -346,12 +336,9 @@ async def help(ctx, *cmds: str):
command = command.all_commands.get(key)
if command is None:
if use_embeds:
await destination.send(
embed=ctx.bot.formatter.cmd_not_found(ctx, key))
await destination.send(embed=ctx.bot.formatter.cmd_not_found(ctx, key))
else:
await destination.send(
ctx.bot.command_not_found.format(key)
)
await destination.send(ctx.bot.command_not_found.format(key))
return
except AttributeError:
if use_embeds:
@@ -359,11 +346,11 @@ async def help(ctx, *cmds: str):
embed=ctx.bot.formatter.simple_embed(
ctx,
title='Command "{0.name}" has no subcommands.'.format(command),
color=ctx.bot.formatter.color))
else:
await destination.send(
ctx.bot.command_has_no_subcommands.format(command)
color=ctx.bot.formatter.color,
)
)
else:
await destination.send(ctx.bot.command_has_no_subcommands.format(command))
return
if use_embeds:
embeds = await ctx.bot.formatter.format_help_for(ctx, command)
@@ -391,5 +378,5 @@ async def help(ctx, *cmds: str):
@help.error
async def help_error(ctx, error):
destination = ctx.author if ctx.bot.pm_help else ctx
await destination.send('{0.__name__}: {1}'.format(type(error), error))
await destination.send("{0.__name__}: {1}".format(type(error), error))
traceback.print_tb(error.original.__traceback__, file=sys.stderr)

View File

@@ -3,10 +3,9 @@ from pathlib import Path
from . import commands
__all__ = ['get_locale', 'set_locale', 'reload_locales', 'cog_i18n',
'Translator']
__all__ = ["get_locale", "set_locale", "reload_locales", "cog_i18n", "Translator"]
_current_locale = 'en_us'
_current_locale = "en_us"
WAITING_FOR_MSGID = 1
IN_MSGID = 2
@@ -54,8 +53,8 @@ def _parse(translation_file):
if line.startswith(MSGID):
# Don't check if step is WAITING_FOR_MSGID
untranslated = ''
translated = ''
untranslated = ""
translated = ""
data = line[len(MSGID):-1]
if len(data) == 0: # Multiline mode
step = IN_MSGID
@@ -63,10 +62,9 @@ def _parse(translation_file):
untranslated += data
step = WAITING_FOR_MSGSTR
elif step is IN_MSGID and line.startswith('"') and \
line.endswith('"'):
elif step is IN_MSGID and line.startswith('"') and line.endswith('"'):
untranslated += line[1:-1]
elif step is IN_MSGID and untranslated == '': # Empty MSGID
elif step is IN_MSGID and untranslated == "": # Empty MSGID
step = WAITING_FOR_MSGID
elif step is IN_MSGID: # the MSGID is finished
step = WAITING_FOR_MSGSTR
@@ -79,16 +77,15 @@ def _parse(translation_file):
translations |= {(untranslated, data)}
step = WAITING_FOR_MSGID
elif step is IN_MSGSTR and line.startswith('"') and \
line.endswith('"'):
elif step is IN_MSGSTR and line.startswith('"') and line.endswith('"'):
translated += line[1:-1]
elif step is IN_MSGSTR: # the MSGSTR is finished
step = WAITING_FOR_MSGID
if translated == '':
if translated == "":
translated = untranslated
translations |= {(untranslated, translated)}
if step is IN_MSGSTR:
if translated == '':
if translated == "":
translated = untranslated
translations |= {(untranslated, translated)}
return translations
@@ -107,33 +104,34 @@ def _normalize(string, remove_newline=False):
:param remove_newline:
:return:
"""
def normalize_whitespace(s):
"""Normalizes the whitespace in a string; \s+ becomes one space."""
if not s:
return str(s) # not the same reference
starts_with_space = (s[0] in ' \n\t\r')
ends_with_space = (s[-1] in ' \n\t\r')
starts_with_space = (s[0] in " \n\t\r")
ends_with_space = (s[-1] in " \n\t\r")
if remove_newline:
newline_re = re.compile('[\r\n]+')
s = ' '.join(filter(bool, newline_re.split(s)))
s = ' '.join(filter(bool, s.split('\t')))
s = ' '.join(filter(bool, s.split(' ')))
newline_re = re.compile("[\r\n]+")
s = " ".join(filter(bool, newline_re.split(s)))
s = " ".join(filter(bool, s.split("\t")))
s = " ".join(filter(bool, s.split(" ")))
if starts_with_space:
s = ' ' + s
s = " " + s
if ends_with_space:
s += ' '
s += " "
return s
if string is None:
return ""
string = string.replace('\\n\\n', '\n\n')
string = string.replace('\\n', ' ')
string = string.replace("\\n\\n", "\n\n")
string = string.replace("\\n", " ")
string = string.replace('\\"', '"')
string = string.replace("\'", "'")
string = string.replace("'", "'")
string = normalize_whitespace(string)
string = string.strip('\n')
string = string.strip('\t')
string = string.strip("\n")
string = string.strip("\t")
return string
@@ -148,7 +146,7 @@ def get_locale_path(cog_folder: Path, extension: str) -> Path:
:return:
Path of possible localization file, it may not exist.
"""
return cog_folder / 'locales' / "{}.{}".format(get_locale(), extension)
return cog_folder / "locales" / "{}.{}".format(get_locale(), extension)
class Translator:
@@ -193,13 +191,13 @@ class Translator:
"""
self.translations = {}
translation_file = None
locale_path = get_locale_path(self.cog_folder, 'po')
locale_path = get_locale_path(self.cog_folder, "po")
try:
try:
translation_file = locale_path.open('ru', encoding='utf-8')
translation_file = locale_path.open("ru", encoding="utf-8")
except ValueError: # We are using Windows
translation_file = locale_path.open('r', encoding='utf-8')
translation_file = locale_path.open("r", encoding="utf-8")
self._parse(translation_file)
except (IOError, FileNotFoundError): # The translation is unavailable
pass
@@ -221,6 +219,7 @@ class Translator:
def cog_i18n(translator: Translator):
"""Get a class decorator to link the translator to this cog."""
def decorator(cog_class: type):
cog_class.__translator__ = translator
for name, attr in cog_class.__dict__.items():
@@ -228,4 +227,5 @@ def cog_i18n(translator: Translator):
attr.translator = translator
setattr(cog_class, name, attr)
return cog_class
return decorator

View File

@@ -11,13 +11,14 @@ from pathlib import Path
log = logging.getLogger("red")
PRETTY = {"indent": 4, "sort_keys": True, "separators": (',', ' : ')}
MINIFIED = {"sort_keys": True, "separators": (',', ':')}
PRETTY = {"indent": 4, "sort_keys": True, "separators": (",", " : ")}
MINIFIED = {"sort_keys": True, "separators": (",", ":")}
class JsonIO:
"""Basic functions for atomic saving / loading of json files"""
def __init__(self, path: Path=Path.cwd()):
def __init__(self, path: Path = Path.cwd()):
"""
:param path: Full path to file.
"""
@@ -43,7 +44,7 @@ class JsonIO:
# noinspection PyUnresolvedReferences
def _load_json(self):
log.debug("Reading file {}".format(self.path))
with self.path.open(encoding='utf-8', mode="r") as f:
with self.path.open(encoding="utf-8", mode="r") as f:
data = json.load(f)
return data

View File

@@ -1,17 +1,11 @@
import subprocess
TO_TRANSLATE = [
'../cog_manager.py',
'../core_commands.py',
'../dev_commands.py'
]
TO_TRANSLATE = ["../cog_manager.py", "../core_commands.py", "../dev_commands.py"]
def regen_messages():
subprocess.run(
['pygettext', '-n'] + TO_TRANSLATE
)
subprocess.run(["pygettext", "-n"] + TO_TRANSLATE)
if __name__ == "__main__":
regen_messages()
regen_messages()

View File

@@ -8,21 +8,24 @@ from redbot.core import Config
from redbot.core.bot import Red
__all__ = [
"Case", "CaseType", "get_next_case_number", "get_case", "get_all_cases",
"create_case", "get_casetype", "get_all_casetypes", "register_casetype",
"register_casetypes", "get_modlog_channel", "set_modlog_channel",
"reset_cases"
"Case",
"CaseType",
"get_next_case_number",
"get_case",
"get_all_cases",
"create_case",
"get_casetype",
"get_all_casetypes",
"register_casetype",
"register_casetypes",
"get_modlog_channel",
"set_modlog_channel",
"reset_cases",
]
_DEFAULT_GLOBAL = {
"casetypes": {}
}
_DEFAULT_GLOBAL = {"casetypes": {}}
_DEFAULT_GUILD = {
"mod_log": None,
"cases": {},
"casetypes": {}
}
_DEFAULT_GUILD = {"mod_log": None, "cases": {}, "casetypes": {}}
def _register_defaults():
@@ -30,8 +33,8 @@ def _register_defaults():
_conf.register_guild(**_DEFAULT_GUILD)
if not os.environ.get('BUILDING_DOCS'):
_conf = Config.get_conf(None, 1354799444, cog_name='ModLog')
if not os.environ.get("BUILDING_DOCS"):
_conf = Config.get_conf(None, 1354799444, cog_name="ModLog")
_register_defaults()
@@ -39,11 +42,20 @@ class Case:
"""A single mod log case"""
def __init__(
self, guild: discord.Guild, created_at: int, action_type: str,
user: discord.User, moderator: discord.Member, case_number: int,
reason: str=None, until: int=None,
channel: discord.TextChannel=None, amended_by: discord.Member=None,
modified_at: int=None, message: discord.Message=None):
self,
guild: discord.Guild,
created_at: int,
action_type: str,
user: discord.User,
moderator: discord.Member,
case_number: int,
reason: str = None,
until: int = None,
channel: discord.TextChannel = None,
amended_by: discord.Member = None,
modified_at: int = None,
message: discord.Message = None,
):
self.guild = guild
self.created_at = created_at
self.action_type = action_type
@@ -82,11 +94,9 @@ class Case:
else:
await self.message.edit(case_content)
await _conf.guild(self.guild).cases.set_raw(
str(self.case_number), value=self.to_json()
)
await _conf.guild(self.guild).cases.set_raw(str(self.case_number), value=self.to_json())
async def message_content(self, embed: bool=True):
async def message_content(self, embed: bool = True):
"""
Format a case message
@@ -102,22 +112,18 @@ class Case:
"""
casetype = await get_casetype(self.action_type)
title = "{}".format("Case #{} | {} {}".format(
self.case_number, casetype.case_str, casetype.image))
title = "{}".format(
"Case #{} | {} {}".format(self.case_number, casetype.case_str, casetype.image)
)
if self.reason:
reason = "**Reason:** {}".format(self.reason)
else:
reason = \
"**Reason:** Use `[p]reason {} <reason>` to add it".format(
self.case_number
)
reason = "**Reason:** Use `[p]reason {} <reason>` to add it".format(self.case_number)
if self.moderator is not None:
moderator = "{}#{} ({})\n".format(
self.moderator.name,
self.moderator.discriminator,
self.moderator.id
self.moderator.name, self.moderator.discriminator, self.moderator.id
)
else:
moderator = "Unknown"
@@ -126,7 +132,7 @@ class Case:
if self.until:
start = datetime.fromtimestamp(self.created_at)
end = datetime.fromtimestamp(self.until)
end_fmt = end.strftime('%Y-%m-%d %H:%M:%S')
end_fmt = end.strftime("%Y-%m-%d %H:%M:%S")
duration = end - start
dur_fmt = _strfdelta(duration)
until = end_fmt
@@ -135,21 +141,16 @@ class Case:
amended_by = None
if self.amended_by:
amended_by = "{}#{} ({})".format(
self.amended_by.name,
self.amended_by.discriminator,
self.amended_by.id
self.amended_by.name, self.amended_by.discriminator, self.amended_by.id
)
last_modified = None
if self.modified_at:
last_modified = "{}".format(
datetime.fromtimestamp(
self.modified_at
).strftime('%Y-%m-%d %H:%M:%S')
datetime.fromtimestamp(self.modified_at).strftime("%Y-%m-%d %H:%M:%S")
)
user = "{}#{} ({})\n".format(
self.user.name, self.user.discriminator, self.user.id)
user = "{}#{} ({})\n".format(self.user.name, self.user.discriminator, self.user.id)
if embed:
emb = discord.Embed(title=title, description=reason)
@@ -208,7 +209,7 @@ class Case:
"channel": self.channel.id if hasattr(self.channel, "id") else None,
"amended_by": self.amended_by.id if hasattr(self.amended_by, "id") else None,
"modified_at": self.modified_at,
"message": self.message.id if hasattr(self.message, "id") else None
"message": self.message.id if hasattr(self.message, "id") else None,
}
return data
@@ -239,11 +240,18 @@ class Case:
amended_by = guild.get_member(data["amended_by"])
case_guild = bot.get_guild(data["guild"])
return cls(
guild=case_guild, created_at=data["created_at"],
action_type=data["action_type"], user=user, moderator=moderator,
case_number=data["case_number"], reason=data["reason"],
until=data["until"], channel=channel, amended_by=amended_by,
modified_at=data["modified_at"], message=message
guild=case_guild,
created_at=data["created_at"],
action_type=data["action_type"],
user=user,
moderator=moderator,
case_number=data["case_number"],
reason=data["reason"],
until=data["until"],
channel=channel,
amended_by=amended_by,
modified_at=data["modified_at"],
message=message,
)
@@ -266,9 +274,16 @@ class CaseType:
The action type of the action as it would appear in the
audit log
"""
def __init__(
self, name: str, default_setting: bool, image: str,
case_str: str, audit_type: str=None, guild: discord.Guild=None):
self,
name: str,
default_setting: bool,
image: str,
case_str: str,
audit_type: str = None,
guild: discord.Guild = None,
):
self.name = name
self.default_setting = default_setting
self.image = image
@@ -282,7 +297,7 @@ class CaseType:
"default_setting": self.default_setting,
"image": self.image,
"case_str": self.case_str,
"audit_type": self.audit_type
"audit_type": self.audit_type,
}
await _conf.casetypes.set_raw(self.name, value=data)
@@ -302,7 +317,8 @@ class CaseType:
if not self.guild:
return False
return await _conf.guild(self.guild).casetypes.get_raw(
self.name, default=self.default_setting)
self.name, default=self.default_setting
)
async def set_enabled(self, enabled: bool):
"""
@@ -348,16 +364,11 @@ async def get_next_case_number(guild: discord.Guild) -> str:
The next case number
"""
cases = sorted(
(await _conf.guild(guild).get_raw("cases")),
key=lambda x: int(x),
reverse=True
)
cases = sorted((await _conf.guild(guild).get_raw("cases")), key=lambda x: int(x), reverse=True)
return str(int(cases[0]) + 1) if cases else "1"
async def get_case(case_number: int, guild: discord.Guild,
bot: Red) -> Case:
async def get_case(case_number: int, guild: discord.Guild, bot: Red) -> Case:
"""
Gets the case with the associated case number
@@ -384,9 +395,7 @@ async def get_case(case_number: int, guild: discord.Guild,
try:
case = await _conf.guild(guild).cases.get_raw(str(case_number))
except KeyError as e:
raise RuntimeError(
"That case does not exist for guild {}".format(guild.name)
) from e
raise RuntimeError("That case does not exist for guild {}".format(guild.name)) from e
mod_channel = await get_modlog_channel(guild)
return await Case.from_json(mod_channel, bot, case)
@@ -416,11 +425,17 @@ async def get_all_cases(guild: discord.Guild, bot: Red) -> List[Case]:
return case_list
async def create_case(bot: Red, guild: discord.Guild, created_at: datetime, action_type: str,
user: Union[discord.User, discord.Member],
moderator: discord.Member=None, reason: str=None,
until: datetime=None, channel: discord.TextChannel=None
) -> Union[Case, None]:
async def create_case(
bot: Red,
guild: discord.Guild,
created_at: datetime,
action_type: str,
user: Union[discord.User, discord.Member],
moderator: discord.Member = None,
reason: str = None,
until: datetime = None,
channel: discord.TextChannel = None,
) -> Union[Case, None]:
"""
Creates a new case
@@ -463,9 +478,7 @@ async def create_case(bot: Red, guild: discord.Guild, created_at: datetime, acti
try:
mod_channel = await get_modlog_channel(guild)
except RuntimeError:
raise RuntimeError(
"No mod log channel set for guild {}".format(guild.name)
)
raise RuntimeError("No mod log channel set for guild {}".format(guild.name))
case_type = await get_casetype(action_type, guild)
if case_type is None:
return None
@@ -475,9 +488,20 @@ async def create_case(bot: Red, guild: discord.Guild, created_at: datetime, acti
next_case_number = int(await get_next_case_number(guild))
case = Case(guild, int(created_at.timestamp()), action_type, user, moderator,
next_case_number, reason, int(until.timestamp()) if until else None,
channel, amended_by=None, modified_at=None, message=None)
case = Case(
guild,
int(created_at.timestamp()),
action_type,
user,
moderator,
next_case_number,
reason,
int(until.timestamp()) if until else None,
channel,
amended_by=None,
modified_at=None,
message=None,
)
if hasattr(mod_channel, "send"): # Not going to be the case for tests
use_embeds = await bot.embed_requested(mod_channel, guild.me)
case_content = await case.message_content(use_embeds)
@@ -490,7 +514,7 @@ async def create_case(bot: Red, guild: discord.Guild, created_at: datetime, acti
return case
async def get_casetype(name: str, guild: discord.Guild=None) -> Union[CaseType, None]:
async def get_casetype(name: str, guild: discord.Guild = None) -> Union[CaseType, None]:
"""
Gets the case type
@@ -516,7 +540,7 @@ async def get_casetype(name: str, guild: discord.Guild=None) -> Union[CaseType,
return None
async def get_all_casetypes(guild: discord.Guild=None) -> List[CaseType]:
async def get_all_casetypes(guild: discord.Guild = None) -> List[CaseType]:
"""
Get all currently registered case types
@@ -538,8 +562,8 @@ async def get_all_casetypes(guild: discord.Guild=None) -> List[CaseType]:
async def register_casetype(
name: str, default_setting: bool,
image: str, case_str: str, audit_type: str=None) -> CaseType:
name: str, default_setting: bool, image: str, case_str: str, audit_type: str = None
) -> CaseType:
"""
Registers a case type. If the case type exists and
there are differences between the values passed and
@@ -586,7 +610,7 @@ async def register_casetype(
raise ValueError("The 'image' is not a string!")
if not isinstance(case_str, str):
raise ValueError("The 'case_str' is not a string!")
if audit_type is not None:
if audit_type is not None:
if not isinstance(audit_type, str):
raise ValueError("The 'audit_type' is not a string!")
try:
@@ -665,8 +689,7 @@ async def register_casetypes(new_types: List[dict]) -> List[CaseType]:
return type_list
async def get_modlog_channel(guild: discord.Guild
) -> Union[discord.TextChannel, None]:
async def get_modlog_channel(guild: discord.Guild) -> Union[discord.TextChannel, None]:
"""
Get the current modlog channel
@@ -695,8 +718,9 @@ async def get_modlog_channel(guild: discord.Guild
return channel
async def set_modlog_channel(guild: discord.Guild,
channel: Union[discord.TextChannel, None]) -> bool:
async def set_modlog_channel(
guild: discord.Guild, channel: Union[discord.TextChannel, None]
) -> bool:
"""
Changes the modlog channel
@@ -713,9 +737,7 @@ async def set_modlog_channel(guild: discord.Guild,
`True` if successful
"""
await _conf.guild(guild).mod_log.set(
channel.id if hasattr(channel, "id") else None
)
await _conf.guild(guild).mod_log.set(channel.id if hasattr(channel, "id") else None)
return True
@@ -741,19 +763,19 @@ async def reset_cases(guild: discord.Guild) -> bool:
def _strfdelta(delta):
s = []
if delta.days:
ds = '%i day' % delta.days
ds = "%i day" % delta.days
if delta.days > 1:
ds += 's'
ds += "s"
s.append(ds)
hrs, rem = divmod(delta.seconds, 60*60)
hrs, rem = divmod(delta.seconds, 60 * 60)
if hrs:
hs = '%i hr' % hrs
hs = "%i hr" % hrs
if hrs > 1:
hs += 's'
hs += "s"
s.append(hs)
mins, secs = divmod(rem, 60)
if mins:
s.append('%i min' % mins)
s.append("%i min" % mins)
if secs:
s.append('%i sec' % secs)
return ' '.join(s)
s.append("%i sec" % secs)
return " ".join(s)

View File

@@ -10,8 +10,8 @@ from .utils import TYPE_CHECKING, NewType
if TYPE_CHECKING:
from .bot import Red
log = logging.getLogger('red.rpc')
JsonSerializable = NewType('JsonSerializable', dict)
log = logging.getLogger("red.rpc")
JsonSerializable = NewType("JsonSerializable", dict)
_rpc = JsonRpc(logger=log)
@@ -22,13 +22,13 @@ async def initialize(bot: "Red"):
global _rpc_server
app = Application(loop=bot.loop)
app.router.add_route('*', '/rpc', _rpc)
app.router.add_route("*", "/rpc", _rpc)
handler = app.make_handler()
_rpc_server = await bot.loop.create_server(handler, '127.0.0.1', 6133)
_rpc_server = await bot.loop.create_server(handler, "127.0.0.1", 6133)
log.debug('Created RPC _rpc_server listener.')
log.debug("Created RPC _rpc_server listener.")
def add_topic(topic_name: str):
@@ -77,10 +77,7 @@ def add_method(prefix, method):
method
MUST BE A COROUTINE OR OBJECT.
"""
_rpc.add_methods(
('', method),
prefix=prefix
)
_rpc.add_methods(("", method), prefix=prefix)
def clean_up():

View File

@@ -12,11 +12,13 @@ class SentryManager:
def __init__(self, logger: logging.Logger):
self.client = Client(
dsn=("https://62402161d4cd4ef18f83b16f3e22a020:9310ef55a502442598203205a84da2bb@"
"sentry.io/253983"),
dsn=(
"https://62402161d4cd4ef18f83b16f3e22a020:9310ef55a502442598203205a84da2bb@"
"sentry.io/253983"
),
release=__version__,
include_paths=['redbot'],
enable_breadcrumbs=False
include_paths=["redbot"],
enable_breadcrumbs=False,
)
self.handler = SentryHandler(self.client)
self.logger = logger

View File

@@ -1,4 +1,4 @@
__all__ = ['TYPE_CHECKING', 'NewType', 'safe_delete']
__all__ = ["TYPE_CHECKING", "NewType", "safe_delete"]
from pathlib import Path
import os
@@ -12,6 +12,7 @@ except ImportError:
try:
from typing import NewType
except ImportError:
def NewType(name, tp):
return type(name, (tp,), {})

View File

@@ -3,7 +3,7 @@ from typing import Tuple, List
from collections import namedtuple
Interval = Tuple[timedelta, int]
AntiSpamInterval = namedtuple('AntiSpamInterval', ['period', 'frequency'])
AntiSpamInterval = namedtuple("AntiSpamInterval", ["period", "frequency"])
class AntiSpam:
@@ -26,21 +26,18 @@ class AntiSpam:
(timedelta(seconds=5), 3),
(timedelta(minutes=1), 5),
(timedelta(hours=1), 10),
(timedelta(days=1), 24)
(timedelta(days=1), 24),
]
def __init__(self, intervals: List[Interval]):
self.__event_timestamps = []
_itvs = intervals if intervals else self.default_intervals
self.__intervals = [
AntiSpamInterval(*x) for x in _itvs
]
self.__intervals = [AntiSpamInterval(*x) for x in _itvs]
self.__discard_after = max([x.period for x in self.__intervals])
def __interval_check(self, interval: AntiSpamInterval):
return len(
[t for t in self.__event_timestamps
if (t + interval.period) > datetime.utcnow()]
[t for t in self.__event_timestamps if (t + interval.period) > datetime.utcnow()]
) >= interval.frequency
@property
@@ -57,6 +54,5 @@ class AntiSpam:
"""
self.__event_timestamps.append(datetime.utcnow())
self.__event_timestamps = [
t for t in self.__event_timestamps
if t + self.__discard_after > datetime.utcnow()
t for t in self.__event_timestamps if t + self.__discard_after > datetime.utcnow()
]

View File

@@ -1,6 +1,7 @@
import itertools
from typing import Sequence, Iterator
def error(text: str) -> str:
"""Get text prefixed with an error emoji.
@@ -66,7 +67,7 @@ def bold(text: str) -> str:
return "**{}**".format(text)
def box(text: str, lang: str="") -> str:
def box(text: str, lang: str = "") -> str:
"""Get the given text in a code block.
Parameters
@@ -120,7 +121,7 @@ def italics(text: str) -> str:
return "*{}*".format(text)
def bordered(*columns: Sequence[str], ascii_border: bool=False) -> str:
def bordered(*columns: Sequence[str], ascii_border: bool = False) -> str:
"""Get two blocks of text in a borders.
Note
@@ -141,18 +142,18 @@ def bordered(*columns: Sequence[str], ascii_border: bool=False) -> str:
"""
borders = {
'TL': '-' if ascii_border else '', # Top-left
'TR': '-' if ascii_border else '', # Top-right
'BL': '-' if ascii_border else '', # Bottom-left
'BR': '-' if ascii_border else '', # Bottom-right
'HZ': '-' if ascii_border else '', # Horizontal
'VT': '|' if ascii_border else '', # Vertical
"TL": "-" if ascii_border else "", # Top-left
"TR": "-" if ascii_border else "", # Top-right
"BL": "-" if ascii_border else "", # Bottom-left
"BR": "-" if ascii_border else "", # Bottom-right
"HZ": "-" if ascii_border else "", # Horizontal
"VT": "|" if ascii_border else "", # Vertical
}
sep = ' ' * 4 # Separator between boxes
sep = " " * 4 # Separator between boxes
widths = tuple(max(len(row) for row in column) + 9 for column in columns) # width of each col
colsdone = [False] * len(columns) # whether or not each column is done
lines = [sep.join('{TL}' + '{HZ}'*width + '{TR}' for width in widths)]
lines = [sep.join("{TL}" + "{HZ}" * width + "{TR}" for width in widths)]
for line in itertools.zip_longest(*columns):
row = []
@@ -162,36 +163,38 @@ def bordered(*columns: Sequence[str], ascii_border: bool=False) -> str:
if column is None:
if not done:
# bottom border of column
column = '{HZ}' * width
row.append('{BL}' + column + '{BR}')
column = "{HZ}" * width
row.append("{BL}" + column + "{BR}")
colsdone[colidx] = True # mark column as done
else:
# leave empty
row.append(' ' * (width + 2))
row.append(" " * (width + 2))
else:
column += ' ' * (width - len(column)) # append padded spaces
row.append('{VT}' + column + '{VT}')
column += " " * (width - len(column)) # append padded spaces
row.append("{VT}" + column + "{VT}")
lines.append(sep.join(row))
final_row = []
for width, done in zip(widths, colsdone):
if not done:
final_row.append('{BL}' + '{HZ}' * width + '{BR}')
final_row.append("{BL}" + "{HZ}" * width + "{BR}")
else:
final_row.append(' ' * (width + 2))
final_row.append(" " * (width + 2))
lines.append(sep.join(final_row))
return "\n".join(lines).format(**borders)
def pagify(text: str,
delims: Sequence[str]=["\n"],
*,
priority: bool=False,
escape_mass_mentions: bool=True,
shorten_by: int=8,
page_length: int=2000) -> Iterator[str]:
def pagify(
text: str,
delims: Sequence[str] = ["\n"],
*,
priority: bool = False,
escape_mass_mentions: bool = True,
shorten_by: int = 8,
page_length: int = 2000
) -> Iterator[str]:
"""Generate multiple pages from the given text.
Note
@@ -232,10 +235,10 @@ def pagify(text: str,
while len(in_text) > page_length:
this_page_len = page_length
if escape_mass_mentions:
this_page_len -= (in_text.count("@here", 0, page_length) +
in_text.count("@everyone", 0, page_length))
closest_delim = (in_text.rfind(d, 1, this_page_len)
for d in delims)
this_page_len -= (
in_text.count("@here", 0, page_length) + in_text.count("@everyone", 0, page_length)
)
closest_delim = (in_text.rfind(d, 1, this_page_len) for d in delims)
if priority:
closest_delim = next((x for x in closest_delim if x > 0), -1)
else:
@@ -290,8 +293,7 @@ def underline(text: str) -> str:
return "__{}__".format(text)
def escape(text: str, *, mass_mentions: bool=False,
formatting: bool=False) -> str:
def escape(text: str, *, mass_mentions: bool = False, formatting: bool = False) -> str:
"""Get text with all mass mentions or markdown escaped.
Parameters
@@ -313,8 +315,7 @@ def escape(text: str, *, mass_mentions: bool=False,
text = text.replace("@everyone", "@\u200beveryone")
text = text.replace("@here", "@\u200bhere")
if formatting:
text = (text.replace("`", "\\`")
.replace("*", "\\*")
.replace("_", "\\_")
.replace("~", "\\~"))
text = (
text.replace("`", "\\`").replace("*", "\\*").replace("_", "\\_").replace("~", "\\~")
)
return text

View File

@@ -28,7 +28,7 @@ class DataConverter:
The file isn't valid JSON
"""
try:
with file_path.open(mode='r', encoding='utf-8') as f:
with file_path.open(mode="r", encoding="utf-8") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
raise

View File

@@ -10,10 +10,14 @@ import discord
from redbot.core import commands
async def menu(ctx: commands.Context, pages: list,
controls: dict,
message: discord.Message=None, page: int=0,
timeout: float=30.0):
async def menu(
ctx: commands.Context,
pages: list,
controls: dict,
message: discord.Message = None,
page: int = 0,
timeout: float = 30.0,
):
"""
An emoji-based menu
@@ -48,8 +52,10 @@ async def menu(ctx: commands.Context, pages: list,
RuntimeError
If either of the notes above are violated
"""
if not all(isinstance(x, discord.Embed) for x in pages) and\
not all(isinstance(x, str) for x in pages):
if (
not all(isinstance(x, discord.Embed) for x in pages)
and not all(isinstance(x, str) for x in pages)
):
raise RuntimeError("All pages must be of the same type")
for key, value in controls.items():
if not asyncio.iscoroutinefunction(value):
@@ -70,15 +76,10 @@ async def menu(ctx: commands.Context, pages: list,
await message.edit(content=current_page)
def react_check(r, u):
return u == ctx.author and r.message.id == message.id and \
str(r.emoji) in controls.keys()
return u == ctx.author and r.message.id == message.id and str(r.emoji) in controls.keys()
try:
react, user = await ctx.bot.wait_for(
"reaction_add",
check=react_check,
timeout=timeout
)
react, user = await ctx.bot.wait_for("reaction_add", check=react_check, timeout=timeout)
except asyncio.TimeoutError:
try:
await message.clear_reactions()
@@ -87,14 +88,18 @@ async def menu(ctx: commands.Context, pages: list,
await message.remove_reaction(key, ctx.bot.user)
return None
return await controls[react.emoji](ctx, pages, controls,
message, page,
timeout, react.emoji)
return await controls[react.emoji](ctx, pages, controls, message, page, timeout, react.emoji)
async def next_page(ctx: commands.Context, pages: list,
controls: dict, message: discord.Message, page: int,
timeout: float, emoji: str):
async def next_page(
ctx: commands.Context,
pages: list,
controls: dict,
message: discord.Message,
page: int,
timeout: float,
emoji: str,
):
perms = message.channel.permissions_for(ctx.guild.me)
if perms.manage_messages: # Can manage messages, so remove react
try:
@@ -105,13 +110,18 @@ async def next_page(ctx: commands.Context, pages: list,
page = 0 # Loop around to the first item
else:
page = page + 1
return await menu(ctx, pages, controls, message=message,
page=page, timeout=timeout)
return await menu(ctx, pages, controls, message=message, page=page, timeout=timeout)
async def prev_page(ctx: commands.Context, pages: list,
controls: dict, message: discord.Message, page: int,
timeout: float, emoji: str):
async def prev_page(
ctx: commands.Context,
pages: list,
controls: dict,
message: discord.Message,
page: int,
timeout: float,
emoji: str,
):
perms = message.channel.permissions_for(ctx.guild.me)
if perms.manage_messages: # Can manage messages, so remove react
try:
@@ -122,20 +132,21 @@ async def prev_page(ctx: commands.Context, pages: list,
next_page = len(pages) - 1 # Loop around to the last item
else:
next_page = page - 1
return await menu(ctx, pages, controls, message=message,
page=next_page, timeout=timeout)
return await menu(ctx, pages, controls, message=message, page=next_page, timeout=timeout)
async def close_menu(ctx: commands.Context, pages: list,
controls: dict, message: discord.Message, page: int,
timeout: float, emoji: str):
async def close_menu(
ctx: commands.Context,
pages: list,
controls: dict,
message: discord.Message,
page: int,
timeout: float,
emoji: str,
):
if message:
await message.delete()
return None
DEFAULT_CONTROLS = {
"": prev_page,
"": close_menu,
"": next_page
}
DEFAULT_CONTROLS = {"": prev_page, "": close_menu, "": next_page}

View File

@@ -8,8 +8,7 @@ from redbot.core import Config
from redbot.core.bot import Red
async def mass_purge(messages: List[discord.Message],
channel: discord.TextChannel):
async def mass_purge(messages: List[discord.Message], channel: discord.TextChannel):
"""Bulk delete messages from a channel.
If more than 100 messages are supplied, the bot will delete 100 messages at
@@ -80,24 +79,23 @@ def get_audit_reason(author: discord.Member, reason: str = None):
The formatted audit log reason.
"""
return \
"Action requested by {} (ID {}). Reason: {}".format(author, author.id, reason) if reason else \
"Action requested by {} (ID {}).".format(author, author.id)
return "Action requested by {} (ID {}). Reason: {}".format(
author, author.id, reason
) if reason else "Action requested by {} (ID {}).".format(
author, author.id
)
async def is_allowed_by_hierarchy(bot: Red,
settings: Config,
guild: discord.Guild,
mod: discord.Member,
user: discord.Member):
async def is_allowed_by_hierarchy(
bot: Red, settings: Config, guild: discord.Guild, mod: discord.Member, user: discord.Member
):
if not await settings.guild(guild).respect_hierarchy():
return True
is_special = mod == guild.owner or await bot.is_owner(mod)
return mod.top_role.position > user.top_role.position or is_special
async def is_mod_or_superior(
bot: Red, obj: Union[discord.Message, discord.Member, discord.Role]):
async def is_mod_or_superior(bot: Red, obj: Union[discord.Message, discord.Member, discord.Role]):
"""Check if an object has mod or superior permissions.
If a message is passed, its author's permissions are checked. If a role is
@@ -129,7 +127,7 @@ async def is_mod_or_superior(
elif isinstance(obj, discord.Role):
pass
else:
raise TypeError('Only messages, members or roles may be passed')
raise TypeError("Only messages, members or roles may be passed")
server = obj.guild
admin_role_id = await bot.db.guild(server).admin_role()
@@ -168,26 +166,27 @@ def strfdelta(delta: timedelta):
"""
s = []
if delta.days:
ds = '%i day' % delta.days
ds = "%i day" % delta.days
if delta.days > 1:
ds += 's'
ds += "s"
s.append(ds)
hrs, rem = divmod(delta.seconds, 60*60)
hrs, rem = divmod(delta.seconds, 60 * 60)
if hrs:
hs = '%i hr' % hrs
hs = "%i hr" % hrs
if hrs > 1:
hs += 's'
hs += "s"
s.append(hs)
mins, secs = divmod(rem, 60)
if mins:
s.append('%i min' % mins)
s.append("%i min" % mins)
if secs:
s.append('%i sec' % secs)
return ' '.join(s)
s.append("%i sec" % secs)
return " ".join(s)
async def is_admin_or_superior(
bot: Red, obj: Union[discord.Message, discord.Member, discord.Role]):
bot: Red, obj: Union[discord.Message, discord.Member, discord.Role]
):
"""Same as `is_mod_or_superior` except for admin permissions.
If a message is passed, its author's permissions are checked. If a role is
@@ -219,7 +218,7 @@ async def is_admin_or_superior(
elif isinstance(obj, discord.Role):
pass
else:
raise TypeError('Only messages, members or roles may be passed')
raise TypeError("Only messages, members or roles may be passed")
server = obj.guild
admin_role_id = await bot.db.guild(server).admin_role()

View File

@@ -16,10 +16,7 @@ class TunnelMeta(type):
"""
def __call__(cls, *args, **kwargs):
lockout_tuple = (
(kwargs.get('sender'), kwargs.get('origin')),
kwargs.get('recipient')
)
lockout_tuple = ((kwargs.get("sender"), kwargs.get("origin")), kwargs.get("recipient"))
if lockout_tuple in _instances:
return _instances[lockout_tuple]
@@ -30,13 +27,8 @@ class TunnelMeta(type):
while True:
try:
if not (
any(
lockout_tuple[0] == x[0]
for x in _instances.keys()
) or any(
lockout_tuple[1] == x[1]
for x in _instances.keys()
)
any(lockout_tuple[0] == x[0] for x in _instances.keys())
or any(lockout_tuple[1] == x[1] for x in _instances.keys())
):
# if this isn't temporarily stored, the weakref dict
# will discard this before the return statement,
@@ -70,10 +62,9 @@ class Tunnel(metaclass=TunnelMeta):
The user on the other end of the tunnel
"""
def __init__(self, *,
sender: discord.Member,
origin: discord.TextChannel,
recipient: discord.User):
def __init__(
self, *, sender: discord.Member, origin: discord.TextChannel, recipient: discord.User
):
self.sender = sender
self.origin = origin
self.recipient = recipient
@@ -81,11 +72,8 @@ class Tunnel(metaclass=TunnelMeta):
async def react_close(self, *, uid: int, message: str):
send_to = self.origin if uid == self.sender.id else self.sender
closer = next(filter(
lambda x: x.id == uid, (self.sender, self.recipient)), None)
await send_to.send(
message.format(closer=closer)
)
closer = next(filter(lambda x: x.id == uid, (self.sender, self.recipient)), None)
await send_to.send(message.format(closer=closer))
@property
def members(self):
@@ -97,8 +85,8 @@ class Tunnel(metaclass=TunnelMeta):
@staticmethod
async def message_forwarder(
*, destination: discord.abc.Messageable,
content: str=None, embed=None, files=[]) -> List[discord.Message]:
*, destination: discord.abc.Messageable, content: str = None, embed=None, files=[]
) -> List[discord.Message]:
"""
This does the actual sending, use this instead of a full tunnel
if you are using command initiated reactions instead of persistent
@@ -131,18 +119,13 @@ class Tunnel(metaclass=TunnelMeta):
files = files if files else None
if content:
for page in pagify(content):
rets.append(
await destination.send(
page, files=files, embed=embed)
)
rets.append(await destination.send(page, files=files, embed=embed))
if files:
del files
if embed:
del embed
elif embed or files:
rets.append(
await destination.send(files=files, embed=embed)
)
rets.append(await destination.send(files=files, embed=embed))
return rets
@staticmethod
@@ -172,15 +155,12 @@ class Tunnel(metaclass=TunnelMeta):
size += sys.getsizeof(_fp)
if size > max_size:
return []
files.append(
discord.File(_fp, filename=a.filename)
)
files.append(discord.File(_fp, filename=a.filename))
return files
async def communicate(self, *,
message: discord.Message,
topic: str=None,
skip_message_content: bool=False):
async def communicate(
self, *, message: discord.Message, topic: str = None, skip_message_content: bool = False
):
"""
Forwards a message.
@@ -208,18 +188,15 @@ class Tunnel(metaclass=TunnelMeta):
the bot can't upload at the origin channel
or can't add reactions there.
"""
if message.channel == self.origin \
and message.author == self.sender:
if message.channel == self.origin and message.author == self.sender:
send_to = self.recipient
elif message.author == self.recipient \
and isinstance(message.channel, discord.DMChannel):
elif message.author == self.recipient and isinstance(message.channel, discord.DMChannel):
send_to = self.origin
else:
return None
if not skip_message_content:
content = "\n".join((topic, message.content)) if topic \
else message.content
content = "\n".join((topic, message.content)) if topic else message.content
else:
content = topic
@@ -234,11 +211,7 @@ class Tunnel(metaclass=TunnelMeta):
else:
attach = []
rets = await self.message_forwarder(
destination=send_to,
content=content,
files=attach
)
rets = await self.message_forwarder(destination=send_to, content=content, files=attach)
await message.add_reaction("\N{WHITE HEAVY CHECK MARK}")
await message.add_reaction("\N{NEGATIVE SQUARED CROSS MARK}")