[V3 Everything] Package bot and write setup scripts (#964)

Ya'll are gonna hate me.

* Initial modifications

* Add initial setup.py

* working setup py help

* Modify setup file to package stuff

* Move a bunch of shit and fix imports

* Fix or skip tests

* Must add init files for find_packages to work

* Move main to scripts folder and rename

* Add shebangs

* Copy over translation files

* WORKING PIP INSTALL

* add dependency information

* Hardcoded version for now, will need to figure out a better way to do this

* OKAY ITS FINALLY FUCKING WORKING

* Add this guy

* Fix stuff

* Change readme to rst

* Remove double sentry opt in

* Oopsie

* Fix this thing

* Aaaand fix test

* Aaaand fix test

* Fix core cog importing and default cog install path

* Adjust readme

* change instance name from optional to required

* Ayyy let's do more dependency injection
This commit is contained in:
Will
2017-09-08 23:14:32 -04:00
committed by GitHub
parent 6b1fc786ee
commit d69fd63da7
85 changed files with 451 additions and 255 deletions

7
redbot/core/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
import pkg_resources
from .config import Config
__all__ = ["Config", "__version__"]
__version__ = version = pkg_resources.require("Red-DiscordBot")[0].version

554
redbot/core/bank.py Normal file
View File

@@ -0,0 +1,554 @@
import datetime
import os
from typing import Union, List
import discord
from redbot.core import Config
__all__ = ["Account", "get_balance", "set_balance", "withdraw_credits", "deposit_credits",
"can_spend", "transfer_credits", "wipe_bank", "get_guild_accounts",
"get_global_accounts", "get_account", "is_global", "set_global",
"get_bank_name", "set_bank_name", "get_currency_name", "set_currency_name",
"get_default_balance", "set_default_balance"]
_DEFAULT_GLOBAL = {
"is_global": False,
"bank_name": "Twentysix bank",
"currency": "credits",
"default_balance": 100
}
_DEFAULT_GUILD = {
"bank_name": "Twentysix bank",
"currency": "credits",
"default_balance": 100
}
_DEFAULT_MEMBER = {
"name": "",
"balance": 0,
"created_at": 0
}
_DEFAULT_USER = _DEFAULT_MEMBER
_bank_type = type("Bank", (object,), {})
class Account:
"""A single account. This class should ONLY be instantiated by the bank itself."""
def __init__(self, name: str, balance: int, created_at: datetime.datetime):
self.name = name
self.balance = balance
self.created_at = created_at
def _register_defaults():
_conf.register_global(**_DEFAULT_GLOBAL)
_conf.register_guild(**_DEFAULT_GUILD)
_conf.register_member(**_DEFAULT_MEMBER)
_conf.register_user(**_DEFAULT_USER)
if not os.environ.get('BUILDING_DOCS'):
_conf = Config.get_conf(_bank_type(), 384734293238749, force_registration=True)
_register_defaults()
def _encoded_current_time() -> int:
"""
Encoded current timestamp in UTC.
:return:
"""
now = datetime.datetime.utcnow()
return _encode_time(now)
def _encode_time(time: datetime.datetime) -> int:
"""
Goes from datetime object to serializable int.
:param time:
:return:
"""
ret = int(time.timestamp())
return ret
def _decode_time(time: int) -> datetime.datetime:
"""
Returns decoded timestamp in UTC.
:param time:
:return:
"""
return datetime.datetime.utcfromtimestamp(time)
async def get_balance(member: discord.Member) -> int:
"""
Gets the current balance of a member.
:param discord.Member member:
The member whose balance to check.
:return:
The member's balance
:rtype:
int
"""
acc = await get_account(member)
return acc.balance
async def can_spend(member: discord.Member, amount: int) -> bool:
"""
Determines if a member can spend the given amount.
:param discord.Member member:
The member wanting to spend.
:param int amount:
The amount the member wants to spend.
:return:
:code:`True` if the member has a sufficient balance to spend the amount, else :code:`False`.
:rtype:
bool
"""
if _invalid_amount(amount):
return False
return await get_balance(member) > amount
async def set_balance(member: discord.Member, amount: int) -> int:
"""
Sets an account balance.
May raise ValueError if amount is invalid.
:param discord.Member member:
The member whose balance to set.
:param int amount:
The amount to set the balance to.
:return:
New account balance.
:rtype:
int
:raises ValueError:
If attempting to set the balance to a negative number
"""
if amount < 0:
raise ValueError("Not allowed to have negative balance.")
if await is_global():
group = _conf.user(member)
else:
group = _conf.member(member)
await group.balance.set(amount)
if await group.created_at() == 0:
time = _encoded_current_time()
await group.created_at.set(time)
if await group.name() == "":
await group.name.set(member.display_name)
return amount
def _invalid_amount(amount: int) -> bool:
return amount <= 0
async def withdraw_credits(member: discord.Member, amount: int) -> int:
"""
Removes a certain amount of credits from an account.
May raise ValueError if the amount is invalid or if the account has
insufficient funds.
:param discord.Member member:
The member to withdraw credits from.
:param int amount:
The amount to withdraw.
:return:
New account balance.
:rtype:
int
:raises ValueError:
if the withdrawal amount is invalid or if the account has insufficient funds
"""
if _invalid_amount(amount):
raise ValueError("Invalid withdrawal amount {} <= 0".format(amount))
bal = await get_balance(member)
if amount > bal:
raise ValueError("Insufficient funds {} > {}".format(amount, bal))
return await set_balance(member, bal - amount)
async def deposit_credits(member: discord.Member, amount: int) -> int:
"""
Adds a given amount of credits to an account.
May raise ValueError if the amount is invalid.
:param discord.Member member:
The member to deposit credits to.
:param int amount:
The amount to deposit.
:return:
The new balance
:rtype:
int
:raises ValueError:
If the deposit amount is invalid.
"""
if _invalid_amount(amount):
raise ValueError("Invalid withdrawal amount {} <= 0".format(amount))
bal = await get_balance(member)
return await set_balance(member, amount + bal)
async def transfer_credits(from_: discord.Member, to: discord.Member, amount: int):
"""
Transfers a given amount of credits from one account to another.
May raise ValueError if the amount is invalid or if the :code:`from_`
account has insufficient funds.
:param discord.Member from_:
The member to transfer from.
:param discord.Member to:
The member to transfer to.
:param int amount:
The amount to transfer.
:return:
The new balance.
:rtype:
int
:raises ValueError:
If the amount is invalid or if :code:`from_` has insufficient funds.
"""
if _invalid_amount(amount):
raise ValueError("Invalid transfer amount {} <= 0".format(amount))
await withdraw_credits(from_, amount)
return await deposit_credits(to, amount)
async def wipe_bank(user: Union[discord.User, discord.Member]):
"""
Deletes all accounts from the bank.
.. important::
A member is required if the bank is currently guild specific.
:param user:
A user to be used in clearing the bank, this is required for technical
reasons and it does not matter which user/member is used.
:type user:
discord.User or discord.Member
"""
if await is_global():
await _conf.user(user).clear()
else:
await _conf.member(user).clear()
async def get_guild_accounts(guild: discord.Guild) -> List[Account]:
"""
Gets all account data for the given guild.
May raise RuntimeError if the bank is currently global.
:param discord.Guild guild:
The guild to get accounts for.
:return:
A generator for all guild accounts.
:rtype:
generator
:raises RuntimeError:
If the bank is global.
"""
if is_global():
raise RuntimeError("The bank is currently global.")
ret = []
accs = await _conf.member(guild.owner).all_from_kind()
for user_id, acc in accs.items():
acc_data = acc.copy() # There ya go kowlin
acc_data['created_at'] = _decode_time(acc_data['created_at'])
ret.append(Account(**acc_data))
return ret
async def get_global_accounts(user: discord.User) -> List[Account]:
"""
Gets all global account data.
May raise RuntimeError if the bank is currently guild specific.
:param discord.User user:
A user to be used for getting accounts.
:return:
A generator of all global accounts.
:rtype:
generator
:raises RuntimeError:
If the bank is guild specific.
"""
if not is_global():
raise RuntimeError("The bank is not currently global.")
ret = []
accs = await _conf.user(user).all_from_kind() # this is a dict of user -> acc
for user_id, acc in accs.items():
acc_data = acc.copy()
acc_data['created_at'] = _decode_time(acc_data['created_at'])
ret.append(Account(**acc_data))
return ret
async def get_account(member: Union[discord.Member, discord.User]) -> Account:
"""
Gets the appropriate account for the given user or member. A member is
required if the bank is currently guild specific.
:param member:
The user whose account to get.
:type member:
discord.User or discord.Member
:return:
The user's account.
:rtype:
:py:class:`Account`
"""
if await is_global():
acc_data = (await _conf.user(member)()).copy()
default = _DEFAULT_USER.copy()
else:
acc_data = (await _conf.member(member)()).copy()
default = _DEFAULT_MEMBER.copy()
if acc_data == {}:
acc_data = default
acc_data['name'] = member.display_name
try:
acc_data['balance'] = await get_default_balance(member.guild)
except AttributeError:
acc_data['balance'] = await get_default_balance()
acc_data['created_at'] = _decode_time(acc_data['created_at'])
return Account(**acc_data)
async def is_global() -> bool:
"""
Determines if the bank is currently global.
:return:
:code:`True` if the bank is global, otherwise :code:`False`.
:rtype:
bool
"""
return await _conf.is_global()
async def set_global(global_: bool, user: Union[discord.User, discord.Member]) -> bool:
"""
Sets global status of the bank, requires the user parameter for technical reasons.
.. important::
All accounts are reset when you switch!
:param global_:
:code:`True` will set bank to global mode.
:param user:
Must be a Member object if changing TO global mode.
:type user:
discord.User or discord.Member
:return:
New bank mode, :code:`True` is global.
:rtype:
bool
:raises RuntimeError:
If bank is becoming global and :py:class:`discord.Member` was not provided.
"""
if (await is_global()) is global_:
return global_
if is_global():
await _conf.user(user).clear_all()
elif isinstance(user, discord.Member):
await _conf.member(user).clear_all()
else:
raise RuntimeError("You must provide a member if you're changing to global"
" bank mode.")
await _conf.is_global.set(global_)
return global_
async def get_bank_name(guild: discord.Guild=None) -> str:
"""
Gets the current bank name. If the bank is guild-specific the
guild parameter is required.
May raise RuntimeError if guild is missing and required.
:param discord.Guild guild:
The guild to get the bank name for (required if bank is guild-specific).
:return:
The bank's name.
:rtype:
str
:raises RuntimeError:
If the bank is guild-specific and guild was not provided.
"""
if await is_global():
return await _conf.bank_name()
elif guild is not None:
return await _conf.guild(guild).bank_name()
else:
raise RuntimeError("Guild parameter is required and missing.")
async def set_bank_name(name: str, guild: discord.Guild=None) -> str:
"""
Sets the bank name, if bank is guild specific the guild parameter is
required.
May throw RuntimeError if guild is required and missing.
:param str name:
The new name for the bank.
:param discord.Guild guild:
The guild to set the bank name for (required if bank is guild-specific).
:return:
The new name for the bank.
:rtype:
str
:raises RuntimeError:
If the bank is guild-specific and guild was not provided.
"""
if await is_global():
await _conf.bank_name.set(name)
elif guild is not None:
await _conf.guild(guild).bank_name.set(name)
else:
raise RuntimeError("Guild must be provided if setting the name of a guild"
"-specific bank.")
return name
async def get_currency_name(guild: discord.Guild=None) -> str:
"""
Gets the currency name of the bank. The guild parameter is required if
the bank is guild-specific.
May raise RuntimeError if the guild is missing and required.
:param discord.Guild guild:
The guild to get the currency name for (required if bank is guild-specific).
:return:
The currency name.
:rtype:
str
:raises RuntimeError:
If the bank is guild-specific and guild was not provided.
"""
if await is_global():
return await _conf.currency()
elif guild is not None:
return await _conf.guild(guild).currency()
else:
raise RuntimeError("Guild must be provided.")
async def set_currency_name(name: str, guild: discord.Guild=None) -> str:
"""
Sets the currency name for the bank, if bank is guild specific the
guild parameter is required.
May raise RuntimeError if guild is missing and required.
:param str name:
The new name for the currency.
:param discord.Guild guild:
The guild to set the currency name for (required if bank is guild-specific).
:return:
The new name for the currency.
:rtype:
str
:raises RuntimeError:
If the bank is guild-specific and guild was not provided.
"""
if await is_global():
await _conf.currency.set(name)
elif guild is not None:
await _conf.guild(guild).currency.set(name)
else:
raise RuntimeError("Guild must be provided if setting the currency"
" name of a guild-specific bank.")
return name
async def get_default_balance(guild: discord.Guild=None) -> int:
"""
Gets the current default balance amount. If the bank is guild-specific
you must pass guild.
May raise RuntimeError if guild is missing and required.
:param discord.Guild guild:
The guild to get the default balance for (required if bank is guild-specific).
:return:
The bank's default balance.
:rtype:
str
:raises RuntimeError:
If the bank is guild-specific and guild was not provided.
"""
if await is_global():
return await _conf.default_balance()
elif guild is not None:
return await _conf.guild(guild).default_balance()
else:
raise RuntimeError("Guild is missing and required!")
async def set_default_balance(amount: int, guild: discord.Guild=None) -> int:
"""
Sets the default balance amount. Guild is required if the bank is
guild-specific.
May raise RuntimeError if guild is missing and required.
May raise ValueError if amount is invalid.
:param int amount:
The new default balance.
:param discord.Guild guild:
The guild to set the default balance for (required if bank is guild-specific).
:return:
The new default balance.
:rtype:
str
:raises RuntimeError:
If the bank is guild-specific and guild was not provided.
:raises ValueError:
If the amount is invalid.
"""
amount = int(amount)
if amount < 0:
raise ValueError("Amount must be greater than zero.")
if await is_global():
await _conf.default_balance.set(amount)
elif guild is not None:
await _conf.guild(guild).default_balance.set(amount)
else:
raise RuntimeError("Guild is missing and required.")
return amount

190
redbot/core/bot.py Normal file
View File

@@ -0,0 +1,190 @@
import asyncio
import os
from collections import Counter
from enum import Enum
from importlib.machinery import ModuleSpec
from pathlib import Path
import discord
from discord.ext import commands
from discord.ext.commands import GroupMixin
from .cog_manager import CogManager
from . import Config
from . import i18n
class Red(commands.Bot):
def __init__(self, cli_flags, bot_dir: Path=Path.cwd(), **kwargs):
self._shutdown_mode = ExitCodes.CRITICAL
self.db = Config.get_core_conf(force_registration=True)
self._co_owners = cli_flags.co_owner
self.db.register_global(
token=None,
prefix=[],
packages=[],
owner=None,
whitelist=[],
blacklist=[],
enable_sentry=None,
locale='en'
)
self.db.register_guild(
prefix=[],
whitelist=[],
blacklist=[],
admin_role=None,
mod_role=None
)
async def prefix_manager(bot, message):
if not cli_flags.prefix:
global_prefix = await bot.db.prefix()
else:
global_prefix = cli_flags.prefix
if message.guild is None:
return global_prefix
server_prefix = await bot.db.guild(message.guild).prefix()
return server_prefix if server_prefix else global_prefix
if "command_prefix" not in kwargs:
kwargs["command_prefix"] = prefix_manager
if cli_flags.owner and "owner_id" not in kwargs:
kwargs["owner_id"] = cli_flags.owner
if "owner_id" not in kwargs:
loop = asyncio.get_event_loop()
loop.run_until_complete(self._dict_abuse(kwargs))
self.counter = Counter()
self.uptime = None
self.main_dir = bot_dir
self.cog_mgr = CogManager(paths=(str(self.main_dir / 'cogs'),))
super().__init__(**kwargs)
async def _dict_abuse(self, indict):
"""
Please blame <@269933075037814786> for this.
:param indict:
:return:
"""
indict['owner_id'] = await self.db.owner()
i18n.set_locale(await self.db.locale())
async def is_owner(self, user):
if user.id in self._co_owners:
return True
return await super().is_owner(user)
async def send_cmd_help(self, ctx):
if ctx.invoked_subcommand:
pages = await self.formatter.format_help_for(ctx, ctx.invoked_subcommand)
for page in pages:
await ctx.send(page)
else:
pages = await self.formatter.format_help_for(ctx, ctx.command)
for page in pages:
await ctx.send(page)
async def shutdown(self, *, restart=False):
"""Gracefully quits Red with exit code 0
If restart is True, the exit code will be 26 instead
Upon receiving that exit code, the launcher restarts Red"""
if not restart:
self._shutdown_mode = ExitCodes.SHUTDOWN
else:
self._shutdown_mode = ExitCodes.RESTART
await self.logout()
def list_packages(self):
"""Lists packages present in the cogs the folder"""
return os.listdir("cogs")
async def save_packages_status(self, packages):
await self.db.packages.set(packages)
async def add_loaded_package(self, pkg_name: str):
curr_pkgs = await self.db.packages()
if pkg_name not in curr_pkgs:
curr_pkgs.append(pkg_name)
await self.save_packages_status(curr_pkgs)
async def remove_loaded_package(self, pkg_name: str):
curr_pkgs = await self.db.packages()
if pkg_name in curr_pkgs:
await self.save_packages_status([p for p in curr_pkgs if p != pkg_name])
def load_extension(self, spec: ModuleSpec):
name = spec.name.split('.')[-1]
if name in self.extensions:
return
lib = spec.loader.load_module()
if not hasattr(lib, 'setup'):
del lib
raise discord.ClientException('extension does not have a setup function')
lib.setup(self)
self.extensions[name] = lib
def unload_extension(self, name):
lib = self.extensions.get(name)
if lib is None:
return
lib_name = lib.__name__ # Thank you
# find all references to the module
# remove the cogs registered from the module
for cogname, cog in self.cogs.copy().items():
if cog.__module__.startswith(lib_name):
self.remove_cog(cogname)
# first remove all the commands from the module
for cmd in self.all_commands.copy().values():
if cmd.module.startswith(lib_name):
if isinstance(cmd, GroupMixin):
cmd.recursively_remove_all_commands()
self.remove_command(cmd.name)
# then remove all the listeners from the module
for event_list in self.extra_events.copy().values():
remove = []
for index, event in enumerate(event_list):
if event.__module__.startswith(lib_name):
remove.append(index)
for index in reversed(remove):
del event_list[index]
try:
func = getattr(lib, 'teardown')
except AttributeError:
pass
else:
try:
func(self)
except:
pass
finally:
# finally remove the import..
del lib
del self.extensions[name]
# del sys.modules[name]
class ExitCodes(Enum):
CRITICAL = 1
SHUTDOWN = 0
RESTART = 26

78
redbot/core/checks.py Normal file
View File

@@ -0,0 +1,78 @@
import discord
from discord.ext import commands
def is_owner(**kwargs):
async def check(ctx):
return await ctx.bot.is_owner(ctx.author, **kwargs)
return commands.check(check)
async def check_permissions(ctx, perms):
if await ctx.bot.is_owner(ctx.author):
return True
elif not perms:
return False
resolved = ctx.channel.permissions_for(ctx.author)
return all(getattr(resolved, name, None) == value for name, value in perms.items())
def mod_or_permissions(**perms):
async def predicate(ctx):
has_perms_or_is_owner = await check_permissions(ctx, perms)
if ctx.guild is None:
return has_perms_or_is_owner
author = ctx.author
settings = ctx.bot.db.guild(ctx.guild)
mod_role_id = await settings.mod_role()
admin_role_id = await settings.admin_role()
mod_role = discord.utils.get(ctx.guild.roles, id=mod_role_id)
admin_role = discord.utils.get(ctx.guild.roles, id=admin_role_id)
is_staff = mod_role in author.roles or admin_role in author.roles
is_guild_owner = author == ctx.guild.owner
return is_staff or has_perms_or_is_owner or is_guild_owner
return commands.check(predicate)
def admin_or_permissions(**perms):
async def predicate(ctx):
has_perms_or_is_owner = await check_permissions(ctx, perms)
if ctx.guild is None:
return has_perms_or_is_owner
author = ctx.author
is_guild_owner = author == ctx.guild.owner
admin_role_id = await ctx.bot.db.guild(ctx.guild).admin_role()
admin_role = discord.utils.get(ctx.guild.roles, id=admin_role_id)
return admin_role in author.roles or has_perms_or_is_owner or is_guild_owner
return commands.check(predicate)
def guildowner_or_permissions(**perms):
async def predicate(ctx):
has_perms_or_is_owner = await check_permissions(ctx, perms)
if ctx.guild is None:
return has_perms_or_is_owner
is_guild_owner = ctx.author == ctx.guild.owner
return is_guild_owner or has_perms_or_is_owner
return commands.check(predicate)
def guildowner():
return guildowner_or_permissions()
def admin():
return admin_or_permissions()
def mod():
return mod_or_permissions()

115
redbot/core/cli.py Normal file
View File

@@ -0,0 +1,115 @@
import argparse
import asyncio
from redbot.core.bot import Red
def confirm(m=""):
return input(m).lower().strip() in ("y", "yes")
def interactive_config(red, token_set, prefix_set):
loop = asyncio.get_event_loop()
token = ""
print("Red - Discord Bot | Configuration process\n")
if not token_set:
print("Please enter a valid token:")
while not token:
token = input("> ")
if not len(token) >= 50:
print("That doesn't look like a valid token.")
token = ""
if token:
loop.run_until_complete(red.db.token.set(token))
if not prefix_set:
prefix = ""
print("\nPick a prefix. A prefix is what you type before a "
"command. Example:\n"
"!help\n^ The exclamation mark is the prefix in this case.\n"
"Can be multiple characters. You will be able to change it "
"later and add more of them.\nChoose your prefix:\n")
while not prefix:
prefix = input("Prefix> ")
if len(prefix) > 10:
print("Your prefix seems overly long. Are you sure it "
"is correct? (y/n)")
if not confirm("> "):
prefix = ""
if prefix:
loop.run_until_complete(red.db.prefix.set([prefix]))
ask_sentry(red)
return token
def ask_sentry(red: Red):
loop = asyncio.get_event_loop()
print("\nThank you for installing Red V3 alpha! The current version\n"
" is not suited for production use and is aimed at testing\n"
" the current and upcoming featureset, that's why we will\n"
" also collect the fatal error logs to help us fix any new\n"
" found issues in a timely manner. If you wish to opt in\n"
" the process please type \"yes\":\n")
if not confirm("> "):
loop.run_until_complete(red.db.enable_sentry.set(False))
else:
loop.run_until_complete(red.db.enable_sentry.set(True))
print("\nThank you for helping us with the development process!")
def parse_cli_flags(args):
parser = argparse.ArgumentParser(description="Red - Discord Bot")
parser.add_argument("--owner", type=int,
help="ID of the owner. Only who hosts "
"Red should be owner, this has "
"serious security implications.")
parser.add_argument("--co-owner", type=int, action="append", default=[],
help="ID of a co-owner. Only people who have access "
"to the system that is hosting Red should be "
"co-owners, as this gives them complete access "
"to the system's data. This has serious "
"security implications if misused. Can be "
"multiple.")
parser.add_argument("--prefix", "-p", action="append",
help="Global prefix. Can be multiple")
parser.add_argument("--no-prompt",
action="store_true",
help="Disables console inputs. Features requiring "
"console interaction could be disabled as a "
"result")
parser.add_argument("--no-cogs",
action="store_true",
help="Starts Red with no cogs loaded, only core")
parser.add_argument("--self-bot",
action='store_true',
help="Specifies if Red should log in as selfbot")
parser.add_argument("--not-bot",
action='store_true',
help="Specifies if the token used belongs to a bot "
"account.")
parser.add_argument("--dry-run",
action="store_true",
help="Makes Red quit with code 0 just before the "
"login. This is useful for testing the boot "
"process.")
parser.add_argument("--debug",
action="store_true",
help="Sets the loggers level as debug")
parser.add_argument("--dev",
action="store_true",
help="Enables developer mode")
parser.add_argument("instance_name",
help="Name of the bot instance created during `redbot-setup`.")
args = parser.parse_args(args)
if args.prefix:
args.prefix = sorted(args.prefix, reverse=True)
else:
args.prefix = []
return args

290
redbot/core/cog_manager.py Normal file
View File

@@ -0,0 +1,290 @@
import pkgutil
from importlib import invalidate_caches
from importlib.machinery import ModuleSpec
from pathlib import Path
from typing import Tuple, Union, List
from . import checks
from .config import Config
from .i18n import CogI18n
from .data_manager import cog_data_path
from discord.ext import commands
from .utils.chat_formatting import box
__all__ = ["CogManager"]
class CogManager:
"""
This module allows you to load cogs from multiple directories and even from outside the bot
directory. You may also set a directory for downloader to install new cogs to, the default
being the :code:`cogs/` folder in the root bot directory.
"""
def __init__(self, paths: Tuple[str]=()):
self.conf = Config.get_conf(self, 2938473984732, True)
tmp_cog_install_path = cog_data_path(self) / "cogs"
tmp_cog_install_path.mkdir(parents=True, exist_ok=True)
self.conf.register_global(
paths=(),
install_path=str(tmp_cog_install_path)
)
self._paths = list(paths)
async def paths(self) -> Tuple[Path, ...]:
"""
All currently valid path directories.
"""
conf_paths = await self.conf.paths()
other_paths = self._paths
all_paths = set(list(conf_paths) + list(other_paths))
paths = [Path(p) for p in all_paths]
if self.install_path not in paths:
paths.insert(0, await self.install_path())
return tuple(p.resolve() for p in paths if p.is_dir())
async def install_path(self) -> Path:
"""
The install path for 3rd party cogs.
"""
p = Path(await self.conf.install_path())
return p.resolve()
async def set_install_path(self, path: Path) -> Path:
"""
Install path setter, will return the absolute path to
the given path.
.. note::
The bot will not remember your old cog install path which means
that ALL PREVIOUSLY INSTALLED COGS will now be unfindable.
:param pathlib.Path path:
The new directory for cog installs.
:raises ValueError:
If :code:`path` is not an existing directory.
"""
if not path.is_dir():
raise ValueError("The install path must be an existing directory.")
resolved = path.resolve()
await self.conf.install_path.set(str(resolved))
return resolved
@staticmethod
def _ensure_path_obj(path: Union[Path, str]) -> Path:
"""
Guarantees an object will be a path object.
:param path:
:type path:
pathlib.Path or str
:rtype:
pathlib.Path
"""
try:
path.exists()
except AttributeError:
path = Path(path)
return path
async def add_path(self, path: Union[Path, str]):
"""
Adds a cog path to current list, will ignore duplicates. Does have
a side effect of removing all invalid paths from the saved path
list.
:param path:
Path to add.
:type path:
pathlib.Path or str
:raises ValueError:
If :code:`path` does not resolve to an existing directory.
"""
path = self._ensure_path_obj(path)
# This makes the path absolute, will break if a bot install
# changes OS/Computer?
path = path.resolve()
if not path.is_dir():
raise ValueError("'{}' is not a valid directory.".format(path))
if path == await self.install_path():
raise ValueError("Cannot add the install path as an additional path.")
all_paths = set(await self.paths() + (path, ))
# noinspection PyTypeChecker
await self.set_paths(all_paths)
async def remove_path(self, path: Union[Path, str]) -> Tuple[Path, ...]:
"""
Removes a path from the current paths list.
:param path: Path to remove.
:type path:
pathlib.Path or str
:return:
Tuple of new valid paths.
:rtype: tuple
"""
path = self._ensure_path_obj(path)
all_paths = list(await self.paths())
if path in all_paths:
all_paths.remove(path) # Modifies in place
await self.set_paths(all_paths)
return tuple(all_paths)
async def set_paths(self, paths_: List[Path]):
"""
Sets the current paths list.
:param List[pathlib.Path] paths_:
List of paths to set.
"""
str_paths = [str(p) for p in paths_]
await self.conf.paths.set(str_paths)
async def find_cog(self, name: str) -> ModuleSpec:
"""
Finds a cog in the list of available paths.
:param name:
Name of the cog to find.
:raises RuntimeError:
If there is no cog with the given name.
:return:
A module spec to be used for specialized cog loading.
:rtype:
importlib.machinery.ModuleSpec
"""
resolved_paths = [str(p.resolve()) for p in await self.paths()]
for finder, module_name, _ in pkgutil.iter_modules(resolved_paths):
if name == module_name:
spec = finder.find_spec(name)
if spec:
return spec
raise RuntimeError("No module by the name of '{}' was found"
" in any available path.".format(name))
@staticmethod
def invalidate_caches():
"""
This is an alias for an importlib internal and should be called
any time that a new module has been installed to a cog directory.
*I think.*
"""
invalidate_caches()
_ = CogI18n("CogManagerUI", __file__)
class CogManagerUI:
@commands.command()
@checks.is_owner()
async def paths(self, ctx: commands.Context):
"""
Lists current cog paths in order of priority.
"""
install_path = await ctx.bot.cog_mgr.install_path()
cog_paths = ctx.bot.cog_mgr.paths
cog_paths = [p for p in cog_paths if p != install_path]
msg = _("Install Path: {}\n\n").format(install_path)
partial = []
for i, p in enumerate(cog_paths, start=1):
partial.append("{}. {}".format(i, p))
msg += "\n".join(partial)
await ctx.send(box(msg))
@commands.command()
@checks.is_owner()
async def addpath(self, ctx: commands.Context, path: Path):
"""
Add a path to the list of available cog paths.
"""
if not path.is_dir():
await ctx.send(_("That path is does not exist or does not"
" point to a valid directory."))
return
try:
await ctx.bot.cog_mgr.add_path(path)
except ValueError as e:
await ctx.send(str(e))
else:
await ctx.send(_("Path successfully added."))
@commands.command()
@checks.is_owner()
async def removepath(self, ctx: commands.Context, path_number: int):
"""
Removes a path from the available cog paths given the path_number
from !paths
"""
cog_paths = await ctx.bot.cog_mgr.paths()
try:
to_remove = cog_paths[path_number]
except IndexError:
await ctx.send(_("That is an invalid path number."))
return
await ctx.bot.cog_mgr.remove_path(to_remove)
await ctx.send(_("Path successfully removed."))
@commands.command()
@checks.is_owner()
async def reorderpath(self, ctx: commands.Context, from_: int, to: int):
"""
Reorders paths internally to allow discovery of different cogs.
"""
# Doing this because in the paths command they're 1 indexed
from_ -= 1
to -= 1
all_paths = list(await ctx.bot.cog_mgr.paths())
try:
to_move = all_paths.pop(from_)
except IndexError:
await ctx.send(_("Invalid 'from' index."))
return
try:
all_paths.insert(to, to_move)
except IndexError:
await ctx.send(_("Invalid 'to' index."))
return
await ctx.bot.cog_mgr.set_paths(all_paths)
await ctx.send(_("Paths reordered."))
@commands.command()
@checks.is_owner()
async def installpath(self, ctx: commands.Context, path: Path=None):
"""
Returns the current install path or sets it if one is provided.
The provided path must be absolute or relative to the bot's
directory and it must already exist.
No installed cogs will be transferred in the process.
"""
if path:
if not path.is_absolute():
path = (ctx.bot.main_dir / path).resolve()
try:
await ctx.bot.cog_mgr.set_install_path(path)
except ValueError:
await ctx.send(_("That path does not exist."))
return
install_path = await ctx.bot.cog_mgr.install_path()
await ctx.send(_("The bot will install new cogs to the `{}`"
" directory.").format(install_path))

650
redbot/core/config.py Normal file
View File

@@ -0,0 +1,650 @@
import logging
from copy import deepcopy
from typing import Callable, Union, Tuple
import discord
from .data_manager import cog_data_path, core_data_path
from .drivers.red_json import JSON as JSONDriver
log = logging.getLogger("red.config")
class Value:
"""
A singular "value" of data.
.. py:attribute:: identifiers
This attribute provides all the keys necessary to get a specific data element from a json document.
.. py:attribute:: default
The default value for the data element that :py:attr:`identifiers` points at.
.. py:attribute:: spawner
A reference to :py:attr:`.Config.spawner`.
"""
def __init__(self, identifiers: Tuple[str], default_value, spawner):
self._identifiers = identifiers
self.default = default_value
self.spawner = spawner
@property
def identifiers(self):
return tuple(str(i) for i in self._identifiers)
async def _get(self, default):
driver = self.spawner.get_driver()
try:
ret = await driver.get(self.identifiers)
except KeyError:
return default if default is not None else self.default
return ret
def __call__(self, default=None):
"""
Each :py:class:`Value` object is created by the :py:meth:`Group.__getattr__` method.
The "real" data of the :py:class:`Value` object is accessed by this method. It is a replacement for a
:python:`get()` method.
For example::
foo = await conf.guild(some_guild).foo()
# Is equivalent to this
group_obj = conf.guild(some_guild)
value_obj = conf.foo
foo = await value_obj()
.. important::
This is now, for all intents and purposes, a coroutine.
:param default:
This argument acts as an override for the registered default provided by :py:attr:`default`. This argument
is ignored if its value is :python:`None`.
:type default: Optional[object]
:return:
A coroutine object that must be awaited.
"""
return self._get(default)
async def set(self, value):
"""
Sets the value of the data element indicate by :py:attr:`identifiers`.
For example::
# Sets global value "foo" to False
await conf.foo.set(False)
# Sets guild specific value of "bar" to True
await conf.guild(some_guild).bar.set(True)
"""
driver = self.spawner.get_driver()
await driver.set(self.identifiers, value)
class Group(Value):
"""
A "group" of data, inherits from :py:class:`.Value` which means that all of the attributes and methods available
in :py:class:`.Value` are also available when working with a :py:class:`.Group` object.
.. py:attribute:: defaults
A dictionary of registered default values for this :py:class:`Group`.
.. py:attribute:: force_registration
See :py:attr:`.Config.force_registration`.
"""
def __init__(self, identifiers: Tuple[str],
defaults: dict,
spawner,
force_registration: bool=False):
self._defaults = defaults
self.force_registration = force_registration
self.spawner = spawner
super().__init__(identifiers, {}, self.spawner)
@property
def defaults(self):
return self._defaults.copy()
# noinspection PyTypeChecker
def __getattr__(self, item: str) -> Union["Group", Value]:
"""
Takes in the next accessible item.
1. If it's found to be a group of data we return another :py:class:`Group` object.
2. If it's found to be a data value we return a :py:class:`.Value` object.
3. If it is not found and :py:attr:`force_registration` is :python:`True` then we raise
:py:exc:`AttributeError`.
4. Otherwise return a :py:class:`.Value` object.
:param str item:
The name of the item a cog is attempting to access through normal Python attribute
access.
"""
is_group = self.is_group(item)
is_value = not is_group and self.is_value(item)
new_identifiers = self.identifiers + (item, )
if is_group:
return Group(
identifiers=new_identifiers,
defaults=self._defaults[item],
spawner=self.spawner,
force_registration=self.force_registration
)
elif is_value:
return Value(
identifiers=new_identifiers,
default_value=self._defaults[item],
spawner=self.spawner
)
elif self.force_registration:
raise AttributeError(
"'{}' is not a valid registered Group"
"or value.".format(item)
)
else:
return Value(
identifiers=new_identifiers,
default_value=None,
spawner=self.spawner
)
@property
def _super_group(self) -> 'Group':
super_group = Group(
self.identifiers[:-1],
defaults={},
spawner=self.spawner,
force_registration=self.force_registration
)
return super_group
def is_group(self, item: str) -> bool:
"""
A helper method for :py:meth:`__getattr__`. Most developers will have no need to use this.
:param str item:
See :py:meth:`__getattr__`.
"""
default = self._defaults.get(item)
return isinstance(default, dict)
def is_value(self, item: str) -> bool:
"""
A helper method for :py:meth:`__getattr__`. Most developers will have no need to use this.
:param str item:
See :py:meth:`__getattr__`.
"""
try:
default = self._defaults[item]
except KeyError:
return False
return not isinstance(default, dict)
def get_attr(self, item: str, default=None, resolve=True):
"""
This is available to use as an alternative to using normal Python attribute access. It is required if you find
a need for dynamic attribute access.
.. note::
Use of this method should be avoided wherever possible.
A possible use case::
@commands.command()
async def some_command(self, ctx, item: str):
user = ctx.author
# Where the value of item is the name of the data field in Config
await ctx.send(await self.conf.user(user).get_attr(item))
:param str item:
The name of the data field in :py:class:`.Config`.
:param default:
This is an optional override to the registered default for this item.
:param resolve:
If this is :code:`True` this function will return a coroutine that resolves to a "real" data value,
if :code:`False` this function will return an instance of :py:class:`Group` or :py:class:`Value`
depending on the type of the "real" data value.
:rtype:
Coroutine or Value
"""
value = getattr(self, item)
if resolve:
return value(default=default)
else:
return value
async def all(self) -> dict:
"""
This method allows you to get "all" of a particular group of data. It will return the dictionary of all data
for a particular Guild/Channel/Role/User/Member etc.
.. note::
Any values that have not been set from the registered defaults will have their default values
added to the dictionary that this method returns.
:rtype: dict
"""
defaults = self.defaults
defaults.update(await self())
return defaults
async def all_from_kind(self) -> dict:
"""
This method allows you to get all data from all entries in a given Kind. It will return a dictionary of Kind
ID's -> data.
.. note::
Any values that have not been set from the registered defaults will have their default values
added to the dictionary that this method returns.
.. important::
This method is overridden in :py:meth:`.MemberGroup.all_from_kind` and functions slightly differently.
:rtype: dict
"""
# noinspection PyTypeChecker
all_from_kind = await self._super_group()
for k, v in all_from_kind.items():
defaults = self.defaults
defaults.update(v)
all_from_kind[k] = defaults
return all_from_kind
async def set(self, value):
if not isinstance(value, dict):
raise ValueError(
"You may only set the value of a group to be a dict."
)
await super().set(value)
async def set_attr(self, item: str, value):
"""
Please see :py:meth:`get_attr` for more information.
.. note::
Use of this method should be avoided wherever possible.
"""
value_obj = getattr(self, item)
await value_obj.set(value)
async def clear(self):
"""
Wipes all data from the given Guild/Channel/Role/Member/User. If used on a global group, it will wipe all global
data.
"""
await self.set({})
async def clear_all(self):
"""
Wipes all data from all Guilds/Channels/Roles/Members/Users. If used on a global group, this method wipes all
data from everything.
"""
await self._super_group.set({})
class MemberGroup(Group):
"""
A specific group class for use with member data only. Inherits from :py:class:`.Group`. In this group data is
stored as :code:`GUILD_ID -> MEMBER_ID -> data`.
"""
@property
def _super_group(self) -> Group:
new_identifiers = self.identifiers[:2]
group_obj = Group(
identifiers=new_identifiers,
defaults={},
spawner=self.spawner
)
return group_obj
@property
def _guild_group(self) -> Group:
new_identifiers = self.identifiers[:3]
group_obj = Group(
identifiers=new_identifiers,
defaults={},
spawner=self.spawner
)
return group_obj
async def all_guilds(self) -> dict:
"""
Returns a dict of :code:`GUILD_ID -> MEMBER_ID -> data`.
.. note::
Any values that have not been set from the registered defaults will have their default values
added to the dictionary that this method returns.
:rtype: dict
"""
# noinspection PyTypeChecker
return await super().all_from_kind()
async def all_from_kind(self) -> dict:
"""
Returns a dict of all members from the same guild as the given one.
.. note::
Any values that have not been set from the registered defaults will have their default values
added to the dictionary that this method returns.
:rtype: dict
"""
guild_member = await super().all_from_kind()
return guild_member.get(self.identifiers[-2], {})
class Config:
"""
You should always use :func:`get_conf` or :func:`get_core_conf` to initialize a Config object.
.. important::
Most config data should be accessed through its respective group method (e.g. :py:meth:`guild`)
however the process for accessing global data is a bit different. There is no :python:`global` method
because global data is accessed by normal attribute access::
await conf.foo()
.. py:attribute:: cog_name
The name of the cog that has requested a :py:class:`.Config` object.
.. py:attribute:: unique_identifier
Unique identifier provided to differentiate cog data when name conflicts occur.
.. py:attribute:: spawner
A callable object that returns some driver that implements :py:class:`.drivers.red_base.BaseDriver`.
.. py:attribute:: force_registration
A boolean that determines if :py:class:`.Config` should throw an error if a cog attempts to access an attribute
which has not been previously registered.
.. note::
**You should use this.** By enabling force registration you give :py:class:`.Config` the ability to alert
you instantly if you've made a typo when attempting to access data.
"""
GLOBAL = "GLOBAL"
GUILD = "GUILD"
CHANNEL = "TEXTCHANNEL"
ROLE = "ROLE"
USER = "USER"
MEMBER = "MEMBER"
def __init__(self, cog_name: str, unique_identifier: str,
driver_spawn: Callable,
force_registration: bool=False,
defaults: dict=None):
self.cog_name = cog_name
self.unique_identifier = unique_identifier
self.spawner = driver_spawn
self.force_registration = force_registration
self._defaults = defaults or {}
@property
def defaults(self):
return self._defaults.copy()
@classmethod
def get_conf(cls, cog_instance, identifier: int,
force_registration=False):
"""
Returns a Config instance based on a simplified set of initial
variables.
:param cog_instance:
:param identifier:
Any random integer, used to keep your data
distinct from any other cog with the same name.
:param force_registration:
Should config require registration
of data keys before allowing you to get/set values?
:return:
A new config object.
"""
cog_path_override = cog_data_path(cog_instance)
cog_name = cog_path_override.stem
uuid = str(hash(identifier))
spawner = JSONDriver(cog_name, data_path_override=cog_path_override)
return cls(cog_name=cog_name, unique_identifier=uuid,
force_registration=force_registration,
driver_spawn=spawner)
@classmethod
def get_core_conf(cls, force_registration: bool=False):
"""
All core modules that require a config instance should use this classmethod instead of
:py:meth:`get_conf`
:param int identifier:
See :py:meth:`get_conf`
:param force_registration:
See :py:attr:`force_registration`
:type force_registration: Optional[bool]
"""
driver_spawn = JSONDriver("Core", data_path_override=core_data_path())
return cls(cog_name="Core", driver_spawn=driver_spawn,
unique_identifier='0',
force_registration=force_registration)
def __getattr__(self, item: str) -> Union[Group, Value]:
"""
This is used to generate Value or Group objects for global
values.
:param item:
:return:
"""
global_group = self._get_base_group(self.GLOBAL)
return getattr(global_group, item)
@staticmethod
def _get_defaults_dict(key: str, value) -> dict:
"""
Since we're allowing nested config stuff now, not storing the
_defaults as a flat dict sounds like a good idea. May turn
out to be an awful one but we'll see.
:param key:
:param value:
:return:
"""
ret = {}
partial = ret
splitted = key.split('__')
for i, k in enumerate(splitted, start=1):
if not k.isidentifier():
raise RuntimeError("'{}' is an invalid config key.".format(k))
if i == len(splitted):
partial[k] = value
else:
partial[k] = {}
partial = partial[k]
return ret
@staticmethod
def _update_defaults(to_add: dict, _partial: dict):
"""
This tries to update the _defaults dictionary with the nested
partial dict generated by _get_defaults_dict. This WILL
throw an error if you try to have both a value and a group
registered under the same name.
:param to_add:
:param _partial:
:return:
"""
for k, v in to_add.items():
val_is_dict = isinstance(v, dict)
if k in _partial:
existing_is_dict = isinstance(_partial[k], dict)
if val_is_dict != existing_is_dict:
# != is XOR
raise KeyError("You cannot register a Group and a Value under"
" the same name.")
if val_is_dict:
Config._update_defaults(v, _partial=_partial[k])
else:
_partial[k] = v
else:
_partial[k] = v
def _register_default(self, key: str, **kwargs):
if key not in self._defaults:
self._defaults[key] = {}
data = deepcopy(kwargs)
for k, v in data.items():
to_add = self._get_defaults_dict(k, v)
self._update_defaults(to_add, self._defaults[key])
def register_global(self, **kwargs):
"""
Registers default values for attributes you wish to store in :py:class:`.Config` at a global level.
You can register a single value or multiple values::
conf.register_global(
foo=True
)
conf.register_global(
bar=False,
baz=None
)
You can also now register nested values::
_defaults = {
"foo": {
"bar": True,
"baz": False
}
}
# Will register `foo.bar` == True and `foo.baz` == False
conf.register_global(
**_defaults
)
You can do the same thing without a :python:`_defaults` dict by using double underscore as a variable
name separator::
# This is equivalent to the previous example
conf.register_global(
foo__bar=True,
foo__baz=False
)
"""
self._register_default(self.GLOBAL, **kwargs)
def register_guild(self, **kwargs):
"""
Registers default values on a per-guild level. See :py:meth:`register_global` for more details.
"""
self._register_default(self.GUILD, **kwargs)
def register_channel(self, **kwargs):
"""
Registers default values on a per-guild level. See :py:meth:`register_global` for more details.
"""
# We may need to add a voice channel category later
self._register_default(self.CHANNEL, **kwargs)
def register_role(self, **kwargs):
"""
Registers default values on a per-guild level. See :py:meth:`register_global` for more details.
"""
self._register_default(self.ROLE, **kwargs)
def register_user(self, **kwargs):
"""
Registers default values on a per-guild level. See :py:meth:`register_global` for more details.
"""
self._register_default(self.USER, **kwargs)
def register_member(self, **kwargs):
"""
Registers default values on a per-guild level. See :py:meth:`register_global` for more details.
"""
self._register_default(self.MEMBER, **kwargs)
def _get_base_group(self, key: str, *identifiers: str,
group_class=Group) -> Group:
# noinspection PyTypeChecker
return group_class(
identifiers=(self.unique_identifier, key) + identifiers,
defaults=self._defaults.get(key, {}),
spawner=self.spawner,
force_registration=self.force_registration
)
def guild(self, guild: discord.Guild) -> Group:
"""
Returns a :py:class:`.Group` for the given guild.
:param discord.Guild guild: A discord.py guild object.
"""
return self._get_base_group(self.GUILD, guild.id)
def channel(self, channel: discord.TextChannel) -> Group:
"""
Returns a :py:class:`.Group` for the given channel. This does not currently support differences between
text and voice channels.
:param discord.TextChannel channel: A discord.py text channel object.
"""
return self._get_base_group(self.CHANNEL, channel.id)
def role(self, role: discord.Role) -> Group:
"""
Returns a :py:class:`.Group` for the given role.
:param discord.Role role: A discord.py role object.
"""
return self._get_base_group(self.ROLE, role.id)
def user(self, user: discord.User) -> Group:
"""
Returns a :py:class:`.Group` for the given user.
:param discord.User user: A discord.py user object.
"""
return self._get_base_group(self.USER, user.id)
def member(self, member: discord.Member) -> MemberGroup:
"""
Returns a :py:class:`.MemberGroup` for the given member.
:param discord.Member member: A discord.py member object.
"""
return self._get_base_group(self.MEMBER, member.guild.id, member.id,
group_class=MemberGroup)

View File

@@ -0,0 +1,412 @@
import asyncio
import importlib
import itertools
import logging
import sys
from collections import namedtuple
from random import SystemRandom
from string import ascii_letters, digits
import aiohttp
import discord
from discord.ext import commands
from redbot.core import checks
from redbot.core import i18n
import redbot.cogs # Don't remove this line or core cogs won't load
log = logging.getLogger("red")
OWNER_DISCLAIMER = ("⚠ **Only** the person who is hosting Red should be "
"owner. **This has SERIOUS security implications. The "
"owner can access any data that is present on the host "
"system.** ⚠")
_ = i18n.CogI18n("Core", __file__)
class Core:
"""Commands related to core functions"""
@commands.command()
@checks.is_owner()
async def load(self, ctx, *, cog_name: str):
"""Loads a package"""
try:
spec = await ctx.bot.cog_mgr.find_cog(cog_name)
except RuntimeError:
real_name = ".{}".format(cog_name)
try:
mod = importlib.import_module(real_name, package='redbot.cogs')
except ImportError as e:
await ctx.send(_("No module by that name was found in any"
" cog path."))
return
spec = mod.__spec__
try:
ctx.bot.load_extension(spec)
except Exception as e:
log.exception("Package loading failed", exc_info=e)
await ctx.send(_("Failed to load package. Check your console or "
"logs for details."))
else:
await ctx.bot.add_loaded_package(cog_name)
await ctx.send(_("Done."))
@commands.group()
@checks.is_owner()
async def unload(self, ctx, *, cog_name: str):
"""Unloads a package"""
if cog_name in ctx.bot.extensions:
ctx.bot.unload_extension(cog_name)
await ctx.bot.remove_loaded_package(cog_name)
await ctx.send(_("Done."))
else:
await ctx.send(_("That extension is not loaded."))
@commands.command(name="reload")
@checks.is_owner()
async def _reload(self, ctx, *, cog_name: str):
"""Reloads a package"""
ctx.bot.unload_extension(cog_name)
self.cleanup_and_refresh_modules(cog_name)
try:
spec = await ctx.bot.cog_mgr.find_cog(cog_name)
ctx.bot.load_extension(spec)
except Exception as e:
log.exception("Package reloading failed", exc_info=e)
await ctx.send(_("Failed to reload package. Check your console or "
"logs for details."))
else:
curr_pkgs = await ctx.bot.db.packages()
await ctx.bot.save_packages_status(curr_pkgs)
await ctx.send(_("Done."))
@commands.command(name="shutdown")
@checks.is_owner()
async def _shutdown(self, ctx, silently: bool=False):
"""Shuts down the bot"""
wave = "\N{WAVING HAND SIGN}"
skin = "\N{EMOJI MODIFIER FITZPATRICK TYPE-3}"
try: # We don't want missing perms to stop our shutdown
if not silently:
await ctx.send(_("Shutting down... ") + wave + skin)
except:
pass
await ctx.bot.shutdown()
def cleanup_and_refresh_modules(self, module_name: str):
"""Interally reloads modules so that changes are detected"""
splitted = module_name.split('.')
def maybe_reload(new_name):
try:
lib = sys.modules[new_name]
except KeyError:
pass
else:
importlib._bootstrap._exec(lib.__spec__, lib)
modules = itertools.accumulate(splitted, lambda old, next: "{}.{}".format(old, next))
for m in modules:
maybe_reload(m)
children = {name: lib for name, lib in sys.modules.items() if name.startswith(module_name)}
for child_name, lib in children.items():
importlib._bootstrap._exec(lib.__spec__, lib)
@commands.group(name="set")
async def _set(self, ctx):
"""Changes Red's settings"""
if ctx.invoked_subcommand is None:
await ctx.bot.send_cmd_help(ctx)
@_set.command()
@checks.guildowner()
@commands.guild_only()
async def adminrole(self, ctx, *, role: discord.Role):
"""Sets the admin role for this server"""
await ctx.bot.db.guild(ctx.guild).admin_role.set(role.id)
await ctx.send(_("The admin role for this guild has been set."))
@_set.command()
@checks.guildowner()
@commands.guild_only()
async def modrole(self, ctx, *, role: discord.Role):
"""Sets the mod role for this server"""
await ctx.bot.db.guild(ctx.guild).mod_role.set(role.id)
await ctx.send(_("The mod role for this guild has been set."))
@_set.command()
@checks.is_owner()
async def avatar(self, ctx, url: str):
"""Sets Red's avatar"""
session = aiohttp.ClientSession()
async with session.get(url) as r:
data = await r.read()
await session.close()
try:
await ctx.bot.user.edit(avatar=data)
except discord.HTTPException:
await ctx.send(_("Failed. Remember that you can edit my avatar "
"up to two times a hour. The URL must be a "
"direct link to a JPG / PNG."))
except discord.InvalidArgument:
await ctx.send(_("JPG / PNG format only."))
else:
await ctx.send(_("Done."))
@_set.command(name="game")
@checks.is_owner()
@commands.guild_only()
async def _game(self, ctx, *, game: str):
"""Sets Red's playing status"""
status = ctx.me.status
game = discord.Game(name=game)
await ctx.bot.change_presence(status=status, game=game)
await ctx.send(_("Game set."))
@_set.command()
@checks.is_owner()
@commands.guild_only()
async def status(self, ctx, *, status: str):
"""Sets Red's status
Available statuses:
online
idle
dnd
invisible"""
statuses = {
"online" : discord.Status.online,
"idle" : discord.Status.idle,
"dnd" : discord.Status.dnd,
"invisible" : discord.Status.invisible
}
game = ctx.me.game
try:
status = statuses[status.lower()]
except KeyError:
await ctx.bot.send_cmd_help(ctx)
else:
await ctx.bot.change_presence(status=status,
game=game)
await ctx.send(_("Status changed to %s.") % status)
@_set.command()
@checks.is_owner()
@commands.guild_only()
async def stream(self, ctx, streamer=None, *, stream_title=None):
"""Sets Red's streaming status
Leaving both streamer and stream_title empty will clear it."""
status = ctx.me.status
if stream_title:
stream_title = stream_title.strip()
if "twitch.tv/" not in streamer:
streamer = "https://www.twitch.tv/" + streamer
game = discord.Game(type=1, url=streamer, name=stream_title)
await ctx.bot.change_presence(game=game, status=status)
elif streamer is not None:
await ctx.bot.send_cmd_help(ctx)
return
else:
await ctx.bot.change_presence(game=None, status=status)
await ctx.send(_("Done."))
@_set.command(name="username", aliases=["name"])
@checks.is_owner()
async def _username(self, ctx, *, username: str):
"""Sets Red's username"""
try:
await ctx.bot.user.edit(username=username)
except discord.HTTPException:
await ctx.send(_("Failed to change name. Remember that you can "
"only do it up to 2 times an hour. Use "
"nicknames if you need frequent changes. "
"`{}set nickname`").format(ctx.prefix))
else:
await ctx.send(_("Done."))
@_set.command(name="nickname")
@checks.admin()
@commands.guild_only()
async def _nickname(self, ctx, *, nickname: str):
"""Sets Red's nickname"""
try:
await ctx.bot.user.edit(nick=nickname)
except discord.Forbidden:
await ctx.send(_("I lack the permissions to change my own "
"nickname."))
else:
await ctx.send("Done.")
@_set.command(aliases=["prefixes"])
@checks.is_owner()
async def prefix(self, ctx, *prefixes):
"""Sets Red's global prefix(es)"""
if not prefixes:
await ctx.bot.send_cmd_help(ctx)
return
prefixes = sorted(prefixes, reverse=True)
await ctx.bot.db.prefix.set(prefixes)
await ctx.send(_("Prefix set."))
@_set.command(aliases=["serverprefixes"])
@checks.admin()
@commands.guild_only()
async def serverprefix(self, ctx, *prefixes):
"""Sets Red's server prefix(es)"""
if not prefixes:
await ctx.bot.db.guild(ctx.guild).prefix.set([])
await ctx.send(_("Guild prefixes have been reset."))
return
prefixes = sorted(prefixes, reverse=True)
await ctx.bot.db.guild(ctx.guild).prefix.set(prefixes)
await ctx.send(_("Prefix set."))
@_set.command()
@commands.cooldown(1, 60 * 10, commands.BucketType.default)
async def owner(self, ctx):
"""Sets Red's main owner"""
def check(m):
return m.author == ctx.author and m.channel == ctx.channel
# According to the Python docs this is suitable for cryptographic use
random = SystemRandom()
length = random.randint(25, 35)
chars = ascii_letters + digits
token = ""
for i in range(length):
token += random.choice(chars)
log.info("{0} ({0.id}) requested to be set as owner."
"".format(ctx.author))
print(_("\nVerification token:"))
print(token)
await ctx.send(_("Remember:\n") + OWNER_DISCLAIMER)
await asyncio.sleep(5)
await ctx.send(_("I have printed a one-time token in the console. "
"Copy and paste it here to confirm you are the owner."))
try:
message = await ctx.bot.wait_for("message", check=check,
timeout=60)
except asyncio.TimeoutError:
self.owner.reset_cooldown(ctx)
await ctx.send(_("The set owner request has timed out."))
else:
if message.content.strip() == token:
self.owner.reset_cooldown(ctx)
await ctx.bot.db.owner.set(ctx.author.id)
ctx.bot.owner_id = ctx.author.id
await ctx.send(_("You have been set as owner."))
else:
await ctx.send(_("Invalid token."))
@_set.command()
@checks.is_owner()
async def locale(self, ctx: commands.Context, locale_name: str):
"""
Changes bot locale.
"""
i18n.set_locale(locale_name)
await ctx.bot.db.locale.set(locale_name)
await ctx.send(_("Locale has been set."))
@commands.command()
@commands.cooldown(1, 60, commands.BucketType.user)
async def contact(self, ctx, *, message: str):
"""Sends a message to the owner"""
guild = ctx.message.guild
owner = discord.utils.get(ctx.bot.get_all_members(),
id=ctx.bot.owner_id)
author = ctx.message.author
footer = _("User ID: %s") % author.id
if ctx.guild is None:
source = _("through DM")
else:
source = _("from {}").format(guild)
footer += _(" | Server ID: %s") % guild.id
# We need to grab the DM command prefix (global)
# Since it can also be set through cli flags, bot.db is not a reliable
# source. So we'll just mock a DM message instead.
fake_message = namedtuple('Message', 'guild')
prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None))
prefix = prefixes[0]
content = _("Use `{}dm {} <text>` to reply to this user"
"").format(prefix, author.id)
if isinstance(author, discord.Member):
colour = author.colour
else:
colour = discord.Colour.red()
description = _("Sent by {} {}").format(author, source)
e = discord.Embed(colour=colour, description=message)
if author.avatar_url:
e.set_author(name=description, icon_url=author.avatar_url)
else:
e.set_author(name=description)
e.set_footer(text=footer)
try:
await owner.send(content, embed=e)
except discord.InvalidArgument:
await ctx.send(_("I cannot send your message, I'm unable to find "
"my owner... *sigh*"))
except:
await ctx.send(_("I'm unable to deliver your message. Sorry."))
else:
await ctx.send(_("Your message has been sent."))
@commands.command()
@checks.is_owner()
async def dm(self, ctx, user_id: int, *, message: str):
"""Sends a DM to a user
This command needs a user id to work.
To get a user id enable 'developer mode' in Discord's
settings, 'appearance' tab. Then right click a user
and copy their id"""
destination = discord.utils.get(ctx.bot.get_all_members(),
id=user_id)
if destination is None:
await ctx.send(_("Invalid ID or user not found. You can only "
"send messages to people I share a server "
"with."))
return
e = discord.Embed(colour=discord.Colour.red(), description=message)
description = _("Owner of %s") % ctx.bot.user
fake_message = namedtuple('Message', 'guild')
prefixes = await ctx.bot.command_prefix(ctx.bot, fake_message(guild=None))
prefix = prefixes[0]
e.set_footer(text=_("You can reply to this message with %scontact"
"") % prefix)
if ctx.bot.user.avatar_url:
e.set_author(name=description, icon_url=ctx.bot.user.avatar_url)
else:
e.set_author(name=description)
try:
await destination.send(embed=e)
except:
await ctx.send(_("Sorry, I couldn't deliver your message "
"to %s") % destination)
else:
await ctx.send(_("Message delivered to %s") % destination)

View File

@@ -0,0 +1,74 @@
import sys
from pathlib import Path
import appdirs
from .json_io import JsonIO
jsonio = None
basic_config = None
basic_config_default = {
"DATA_PATH": None,
"COG_PATH_APPEND": "cogs",
"CORE_PATH_APPEND": "core"
}
config_dir = Path(appdirs.AppDirs("Red-DiscordBot").user_config_dir)
config_file = config_dir / 'config.json'
def load_basic_configuration(instance_name: str):
global jsonio
global basic_config
jsonio = JsonIO(config_file)
try:
config = jsonio._load_json()
basic_config = config[instance_name]
except (FileNotFoundError, KeyError):
print("You need to configure the bot instance using `redbot-setup`"
" prior to running the bot.")
sys.exit(1)
def _base_data_path() -> Path:
if basic_config is None:
raise RuntimeError("You must load the basic config before you"
" can get the base data path.")
path = basic_config['DATA_PATH']
return Path(path).resolve()
def cog_data_path(cog_instance=None) -> Path:
"""
Gets the base cog data path. If you want to get the folder with
which to store your own cog's data please pass in an instance
of your cog class.
:param cog_instance:
:return:
"""
try:
base_data_path = Path(_base_data_path())
except RuntimeError as e:
raise RuntimeError("You must load the basic config before you"
" can get the cog data path.") from e
cog_path = base_data_path / basic_config['COG_PATH_APPEND']
if cog_instance:
cog_path = cog_path / cog_instance.__class__.__name__
cog_path.mkdir(exist_ok=True, parents=True)
return cog_path.resolve()
def core_data_path() -> Path:
try:
base_data_path = Path(_base_data_path())
except RuntimeError as e:
raise RuntimeError("You must load the basic config before you"
" can get the core data path.") from e
core_path = base_data_path / basic_config['CORE_PATH_APPEND']
core_path.mkdir(exist_ok=True, parents=True)
return core_path.resolve()

271
redbot/core/dev_commands.py Normal file
View File

@@ -0,0 +1,271 @@
import asyncio
import inspect
import io
import textwrap
import traceback
from contextlib import redirect_stdout
import discord
from . import checks
from .i18n import CogI18n
from discord.ext import commands
from .utils.chat_formatting import box, pagify
"""
Notice:
95% of the below code came from R.Danny which can be found here:
https://github.com/Rapptz/RoboDanny/blob/master/cogs/repl.py
"""
_ = CogI18n("Dev", __file__)
class Dev:
"""Various development focused utilities"""
def __init__(self):
self._last_result = None
self.sessions = set()
@staticmethod
def cleanup_code(content):
"""Automatically removes code blocks from the code."""
# remove ```py\n```
if content.startswith('```') and content.endswith('```'):
return '\n'.join(content.split('\n')[1:-1])
# remove `foo`
return content.strip('` \n')
@staticmethod
def get_syntax_error(e):
if e.text is None:
return '```py\n{0.__class__.__name__}: {0}\n```'.format(e)
return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format(e, '^', type(e).__name__)
@staticmethod
def sanitize_output(ctx: commands.Context, input: str) -> str:
token = ctx.bot.http.token
r = "[EXPUNGED]"
result = input.replace(token, r)
result = result.replace(token.lower(), r)
result = result.replace(token.upper(), r)
return result
@commands.command()
@checks.is_owner()
async def debug(self, ctx, *, code):
"""
Executes code and prints the result to discord.
"""
env = {
'bot': ctx.bot,
'ctx': ctx,
'channel': ctx.channel,
'author': ctx.author,
'guild': ctx.guild,
'message': ctx.message
}
code = self.cleanup_code(code)
try:
result = eval(code, env, locals())
except SyntaxError as e:
await ctx.send(self.get_syntax_error(e))
return
except Exception as e:
await ctx.send('```py\n{}: {}```'.format(type(e).__name__, str(e)), )
return
if asyncio.iscoroutine(result):
result = await result
result = str(result)
result = self.sanitize_output(ctx, result)
await ctx.send(box(result, lang="py"))
@commands.command(name='eval')
@checks.is_owner()
async def _eval(self, ctx, *, body: str):
"""
Executes code as if it was the body of an async function
code MUST be in a code block using three ticks and
there MUST be a newline after the first set and
before the last set. This function will ONLY output
the return value of the function code AND anything
that is output to stdout (e.g. using a print()
statement).
"""
env = {
'bot': ctx.bot,
'ctx': ctx,
'channel': ctx.channel,
'author': ctx.author,
'guild': ctx.guild,
'message': ctx.message,
'_': self._last_result
}
env.update(globals())
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = 'async def func():\n%s' % textwrap.indent(body, ' ')
try:
exec(to_compile, env)
except SyntaxError as e:
return await ctx.send(self.get_syntax_error(e))
func = env['func']
try:
with redirect_stdout(stdout):
ret = await func()
except:
value = stdout.getvalue()
await ctx.send(box('\n{}{}'.format(value, traceback.format_exc()), lang="py"))
else:
value = stdout.getvalue()
try:
await ctx.bot.add_reaction(ctx.message, '\u2705')
except:
pass
if ret is None:
if value:
value = self.sanitize_output(ctx, value)
await ctx.send(box(value, lang="py"))
else:
ret = self.sanitize_output(ctx, ret)
self._last_result = ret
await ctx.send(box("{}{}".format(value, ret), lang="py"))
@commands.command()
@checks.is_owner()
async def repl(self, ctx):
"""
Opens an interactive REPL.
"""
variables = {
'ctx': ctx,
'bot': ctx.bot,
'message': ctx.message,
'guild': ctx.guild,
'channel': ctx.channel,
'author': ctx.author,
'_': None,
}
if ctx.channel.id in self.sessions:
await ctx.send(_('Already running a REPL session in this channel. Exit it with `quit`.'))
return
self.sessions.add(ctx.channel.id)
await ctx.send(_('Enter code to execute or evaluate. `exit()` or `quit` to exit.'))
def msg_check(m):
return m.author == ctx.author and m.channel == ctx.channel and \
m.content.startswith('`')
while True:
response = await ctx.bot.wait_for(
"message",
check=msg_check)
cleaned = self.cleanup_code(response.content)
if cleaned in ('quit', 'exit', 'exit()'):
await ctx.send('Exiting.')
self.sessions.remove(ctx.channel.id)
return
executor = exec
if cleaned.count('\n') == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, '<repl session>', 'eval')
except SyntaxError:
pass
else:
executor = eval
if executor is exec:
try:
code = compile(cleaned, '<repl session>', 'exec')
except SyntaxError as e:
await ctx.send(self.get_syntax_error(e))
continue
variables['message'] = response
stdout = io.StringIO()
msg = None
try:
with redirect_stdout(stdout):
result = executor(code, variables)
if inspect.isawaitable(result):
result = await result
except:
value = stdout.getvalue()
value = self.sanitize_output(ctx, value)
msg = "{}{}".format(value, traceback.format_exc())
else:
value = stdout.getvalue()
if result is not None:
msg = "{}{}".format(value, result)
variables['_'] = result
elif value:
msg = "{}".format(value)
try:
for page in pagify(str(msg), shorten_by=12):
page = self.sanitize_output(ctx, page)
await ctx.send(box(page, "py"))
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send(_('Unexpected error: `{}`').format(e))
@commands.command()
@checks.is_owner()
async def mock(self, ctx, user: discord.Member, *, command):
"""Runs a command as if it was issued by a different user
The prefix must not be entered"""
# Since we have stateful objects now this might be pretty bad
# Sorry Danny
old_author = ctx.author
old_content = ctx.message.content
ctx.message.author = user
ctx.message.content = ctx.prefix + command
await ctx.bot.process_commands(ctx.message)
ctx.message.author = old_author
ctx.message.content = old_content
@commands.command(name="mockmsg")
@checks.is_owner()
async def mock_msg(self, ctx, user: discord.Member, *, content: str):
"""Bot receives a message is if it were sent by a different user.
Only reads the raw content of the message. Attachments, embeds etc. are ignored."""
old_author = ctx.author
old_content = ctx.message.content
ctx.message.author = user
ctx.message.content = content
ctx.bot.dispatch("message", ctx.message)
await asyncio.sleep(2) # If we change the author and content back too quickly,
# the bot won't process the mocked message in time.
ctx.message.author = old_author
ctx.message.content = old_content

View File

View File

@@ -0,0 +1,12 @@
from typing import Tuple
class BaseDriver:
def get_driver(self):
raise NotImplementedError
async def get(self, identifiers: Tuple[str]):
raise NotImplementedError
async def set(self, identifiers: Tuple[str], value):
raise NotImplementedError

View File

@@ -0,0 +1,49 @@
from pathlib import Path
from typing import Tuple
from ..json_io import JsonIO
from .red_base import BaseDriver
class JSON(BaseDriver):
def __init__(self, cog_name, *, data_path_override: Path=None,
file_name_override: str="settings.json"):
super().__init__()
self.cog_name = cog_name
self.file_name = file_name_override
if data_path_override:
self.data_path = data_path_override
else:
self.data_path = Path.cwd() / 'cogs' / '.data' / self.cog_name
self.data_path.mkdir(parents=True, exist_ok=True)
self.data_path = self.data_path / self.file_name
self.jsonIO = JsonIO(self.data_path)
try:
self.data = self.jsonIO._load_json()
except FileNotFoundError:
self.data = {}
self.jsonIO._save_json(self.data)
def get_driver(self):
return self
async def get(self, identifiers: Tuple[str]):
partial = self.data
for i in identifiers:
partial = partial[i]
return partial
async def set(self, identifiers, value):
partial = self.data
for i in identifiers[:-1]:
if i not in partial:
partial[i] = {}
partial = partial[i]
partial[identifiers[-1]] = value
await self.jsonIO._threadsafe_save_json(self.data)

View File

@@ -0,0 +1,211 @@
import pymongo as m
from .red_base import BaseDriver
class RedMongoException(Exception):
"""Base Red Mongo Exception class"""
pass
class MultipleMatches(RedMongoException):
"""Raised when multiple documents match a single cog_name and
cog_identifier pair."""
pass
class MissingCollection(RedMongoException):
"""Raised when a collection is missing from the mongo db"""
pass
class Mongo(BaseDriver):
def __init__(self, host, port=27017, admin_user=None, admin_pass=None,
**kwargs):
self.conn = m.MongoClient(host=host, port=port, **kwargs)
self.admin_user = admin_user
self.admin_pass = admin_pass
self._db = self.conn.red
if self.admin_user is not None and self.admin_pass is not None:
self._db.authenticate(self.admin_user, self.admin_pass)
self._global = self._db.GLOBAL
self._guild = self._db.GUILD
self._channel = self._db.CHANNEL
self._role = self._db.ROLE
self._member = self._db.MEMBER
self._user = self._db.USER
def get_global(self, cog_name, cog_identifier, _, key, *, default=None):
doc = self._global.find(
{"cog_name": cog_name, "cog_identifier": cog_identifier},
projection=[key, ], batch_size=2)
if doc.count() == 2:
raise MultipleMatches("Too many matching documents at the GLOBAL"
" level: ({}, {})".format(cog_name,
cog_identifier))
elif doc.count() == 1:
return doc[0].get(key, default)
return default
def get_guild(self, cog_name, cog_identifier, guild_id, key, *,
default=None):
doc = self._guild.find(
{"cog_name": cog_name, "cog_identifier": cog_identifier,
"guild_id": guild_id},
projection=[key, ], batch_size=2)
if doc.count() == 2:
raise MultipleMatches("Too many matching documents at the GUILD"
" level: ({}, {}, {})".format(
cog_name, cog_identifier, guild_id))
elif doc.count() == 1:
return doc[0].get(key, default)
return default
def get_channel(self, cog_name, cog_identifier, channel_id, key, *,
default=None):
doc = self._channel.find(
{"cog_name": cog_name, "cog_identifier": cog_identifier,
"channel_id": channel_id},
projection=[key, ], batch_size=2)
if doc.count() == 2:
raise MultipleMatches("Too many matching documents at the CHANNEL"
" level: ({}, {}, {})".format(
cog_name, cog_identifier, channel_id))
elif doc.count() == 1:
return doc[0].get(key, default)
return default
def get_role(self, cog_name, cog_identifier, role_id, key, *,
default=None):
doc = self._role.find(
{"cog_name": cog_name, "cog_identifier": cog_identifier,
"role_id": role_id},
projection=[key, ], batch_size=2)
if doc.count() == 2:
raise MultipleMatches("Too many matching documents at the ROLE"
" level: ({}, {}, {})".format(
cog_name, cog_identifier, role_id))
elif doc.count() == 1:
return doc[0].get(key, default)
return default
def get_member(self, cog_name, cog_identifier, user_id, guild_id, key, *,
default=None):
doc = self._member.find(
{"cog_name": cog_name, "cog_identifier": cog_identifier,
"user_id": user_id, "guild_id": guild_id},
projection=[key, ], batch_size=2)
if doc.count() == 2:
raise MultipleMatches("Too many matching documents at the MEMBER"
" level: ({}, {}, mid {}, sid {})".format(
cog_name, cog_identifier, user_id,
guild_id))
elif doc.count() == 1:
return doc[0].get(key, default)
return default
def get_user(self, cog_name, cog_identifier, user_id, key, *,
default=None):
doc = self._user.find(
{"cog_name": cog_name, "cog_identifier": cog_identifier,
"user_id": user_id},
projection=[key, ], batch_size=2)
if doc.count() == 2:
raise MultipleMatches("Too many matching documents at the USER"
" level: ({}, {}, mid {})".format(
cog_name, cog_identifier, user_id))
elif doc.count() == 1:
return doc[0].get(key, default)
else:
return default
def set_global(self, cog_name, cog_identifier, key, value, clear=False):
filter = {"cog_name": cog_name, "cog_identifier": cog_identifier}
data = {"$set": {key: value}}
if self._global.count(filter) > 1:
raise MultipleMatches("Too many matching documents at the GLOBAL"
" level: ({}, {})".format(cog_name,
cog_identifier))
else:
if clear:
self._global.delete_one(filter)
else:
self._global.update_one(filter, data, upsert=True)
def set_guild(self, cog_name, cog_identifier, guild_id, key, value,
clear=False):
filter = {"cog_name": cog_name, "cog_identifier": cog_identifier,
"guild_id": guild_id}
data = {"$set": {key: value}}
if self._guild.count(filter) > 1:
raise MultipleMatches("Too many matching documents at the GUILD"
" level: ({}, {}, {})".format(
cog_name, cog_identifier, guild_id))
else:
if clear:
self._guild.delete_one(filter)
else:
self._guild.update_one(filter, data, upsert=True)
def set_channel(self, cog_name, cog_identifier, channel_id, key, value,
clear=False):
filter = {"cog_name": cog_name, "cog_identifier": cog_identifier,
"channel_id": channel_id}
data = {"$set": {key: value}}
if self._channel.count(filter) > 1:
raise MultipleMatches("Too many matching documents at the CHANNEL"
" level: ({}, {}, {})".format(
cog_name, cog_identifier, channel_id))
else:
if clear:
self._channel.delete_one(filter)
else:
self._channel.update_one(filter, data, upsert=True)
def set_role(self, cog_name, cog_identifier, role_id, key, value,
clear=False):
filter = {"cog_name": cog_name, "cog_identifier": cog_identifier,
"role_id": role_id}
data = {"$set": {key: value}}
if self._role.count(filter) > 1:
raise MultipleMatches("Too many matching documents at the ROLE"
" level: ({}, {}, {})".format(
cog_name, cog_identifier, role_id))
else:
if clear:
self._role.delete_one(filter)
else:
self._role.update_one(filter, data, upsert=True)
def set_member(self, cog_name, cog_identifier, user_id, guild_id, key,
value, clear=False):
filter = {"cog_name": cog_name, "cog_identifier": cog_identifier,
"guild_id": guild_id, "user_id": user_id}
data = {"$set": {key: value}}
if self._member.count(filter) > 1:
raise MultipleMatches("Too many matching documents at the MEMBER"
" level: ({}, {}, mid {}, sid {})".format(
cog_name, cog_identifier, user_id,
guild_id))
else:
if clear:
self._member.delete_one(filter)
else:
self._member.update_one(filter, data, upsert=True)
def set_user(self, cog_name, cog_identifier, user_id, key, value,
clear=False):
filter = {"cog_name": cog_name, "cog_identifier": cog_identifier,
"user_id": user_id}
data = {"$set": {key: value}}
if self._user.count(filter) > 1:
raise MultipleMatches("Too many matching documents at the USER"
" level: ({}, {}, mid {})".format(
cog_name, cog_identifier, user_id))
else:
if clear:
self._user.delete_one(filter)
else:
self._user.update_one(filter, data, upsert=True)

135
redbot/core/events.py Normal file
View File

@@ -0,0 +1,135 @@
import datetime
import logging
import traceback
import discord
from .sentry_setup import should_log
from discord.ext import commands
from .utils.chat_formatting import inline
log = logging.getLogger("red")
sentry_log = logging.getLogger("red.sentry")
INTRO = ("{0}===================\n"
"{0} Red - Discord Bot \n"
"{0}===================\n"
"".format(" " * 20))
def init_events(bot, cli_flags):
@bot.event
async def on_connect():
if bot.uptime is None:
print("Connected to Discord. Getting ready...")
@bot.event
async def on_ready():
if bot.uptime is not None:
return
bot.uptime = datetime.datetime.utcnow()
print(INTRO)
if cli_flags.no_cogs is False:
print("Loading packages...")
failed = []
packages = await bot.db.packages()
for package in packages:
try:
spec = await bot.cog_mgr.find_cog(package)
bot.load_extension(spec)
except Exception as e:
log.exception("Failed to load package {}".format(package),
exc_info=e)
await bot.remove_loaded_package(package)
if packages:
print("Loaded packages: " + ", ".join(packages))
guilds = len(bot.guilds)
users = len(set([m for m in bot.get_all_members()]))
try:
data = await bot.application_info()
invite_url = discord.utils.oauth_url(data.id)
except:
if bot.user.bot:
invite_url = "Could not fetch invite url"
else:
invite_url = None
if guilds:
print("Ready and operational on {} servers with a total of {} "
"users.".format(guilds, users))
else:
print("Ready. I'm not in any server yet!")
if invite_url:
print("\nInvite URL: {}\n".format(invite_url))
@bot.event
async def on_command_error(ctx, error):
if isinstance(error, commands.MissingRequiredArgument):
await bot.send_cmd_help(ctx)
elif isinstance(error, commands.BadArgument):
await bot.send_cmd_help(ctx)
elif isinstance(error, commands.DisabledCommand):
await ctx.send("That command is disabled.")
elif isinstance(error, commands.CommandInvokeError):
# Need to test if the following still works
"""
no_dms = "Cannot send messages to this user"
is_help_cmd = ctx.command.qualified_name == "help"
is_forbidden = isinstance(error.original, discord.Forbidden)
if is_help_cmd and is_forbidden and error.original.text == no_dms:
msg = ("I couldn't send the help message to you in DM. Either"
" you blocked me or you disabled DMs in this server.")
await ctx.send(msg)
return
"""
log.exception("Exception in command '{}'"
"".format(ctx.command.qualified_name),
exc_info=error.original)
message = ("Error in command '{}'. Check your console or "
"logs for details."
"".format(ctx.command.qualified_name))
exception_log = ("Exception in command '{}'\n"
"".format(ctx.command.qualified_name))
exception_log += "".join(traceback.format_exception(type(error),
error, error.__traceback__))
bot._last_exception = exception_log
await ctx.send(inline(message))
module = ctx.command.module
if should_log(module):
sentry_log.exception("Exception in command '{}'"
"".format(ctx.command.qualified_name),
exc_info=error.original)
elif isinstance(error, commands.CommandNotFound):
pass
elif isinstance(error, commands.CheckFailure):
await ctx.send("⛔ You are not authorized to issue that command.")
elif isinstance(error, commands.NoPrivateMessage):
await ctx.send("That command is not available in DMs.")
elif isinstance(error, commands.CommandOnCooldown):
await ctx.send("This command is on cooldown. "
"Try again in {:.2f}s"
"".format(error.retry_after))
else:
log.exception(type(error).__name__, exc_info=error)
@bot.event
async def on_message(message):
bot.counter["messages_read"] += 1
await bot.process_commands(message)
@bot.event
async def on_resumed():
bot.counter["sessions_resumed"] += 1
@bot.event
async def on_command(command):
bot.counter["processed_commands"] += 1

View File

@@ -0,0 +1,38 @@
"""The checks in this module run on every command."""
from discord.ext import commands
def init_global_checks(bot):
@bot.check
async def global_perms(ctx):
"""Check the user is/isn't globally whitelisted/blacklisted."""
if await bot.is_owner(ctx.author):
return True
whitelist = await bot.db.whitelist()
if whitelist:
return ctx.author.id in whitelist
return ctx.author.id not in await bot.db.blacklist()
@bot.check
async def local_perms(ctx: commands.Context):
"""Check the user is/isn't locally whitelisted/blacklisted."""
if await bot.is_owner(ctx.author):
return True
elif ctx.guild is None:
return True
guild_settings = bot.db.guild(ctx.guild)
local_blacklist = await guild_settings.blacklist()
local_whitelist = await guild_settings.whitelist()
if local_whitelist:
return ctx.author.id in local_whitelist
return ctx.author.id not in local_blacklist
@bot.check
async def bots(ctx):
"""Check the user is not another bot."""
return not ctx.author.bot

203
redbot/core/i18n.py Normal file
View File

@@ -0,0 +1,203 @@
import re
from pathlib import Path
__all__ = ['get_locale', 'set_locale', 'reload_locales', 'CogI18n']
_current_locale = 'en_us'
WAITING_FOR_MSGID = 1
IN_MSGID = 2
WAITING_FOR_MSGSTR = 3
IN_MSGSTR = 4
MSGID = 'msgid "'
MSGSTR = 'msgstr "'
_i18n_cogs = {}
def get_locale():
return _current_locale
def set_locale(locale):
global _current_locale
_current_locale = locale
reload_locales()
def reload_locales():
for cog_name, i18n in _i18n_cogs.items():
i18n.load_translations()
def _parse(translation_file):
"""
Custom gettext parsing of translation files. All credit for this code goes
to ProgVal/Valentin Lorentz and the Limnoria project.
https://github.com/ProgVal/Limnoria/blob/master/src/i18n.py
:param translation_file:
An open file-like object containing translations.
:return:
A set of 2-tuples containing the original string and the translated version.
"""
step = WAITING_FOR_MSGID
translations = set()
for line in translation_file:
line = line[0:-1] # Remove the ending \n
line = line
if line.startswith(MSGID):
# Don't check if step is WAITING_FOR_MSGID
untranslated = ''
translated = ''
data = line[len(MSGID):-1]
if len(data) == 0: # Multiline mode
step = IN_MSGID
else:
untranslated += data
step = WAITING_FOR_MSGSTR
elif step is IN_MSGID and line.startswith('"') and \
line.endswith('"'):
untranslated += line[1:-1]
elif step is IN_MSGID and untranslated == '': # Empty MSGID
step = WAITING_FOR_MSGID
elif step is IN_MSGID: # the MSGID is finished
step = WAITING_FOR_MSGSTR
if step is WAITING_FOR_MSGSTR and line.startswith(MSGSTR):
data = line[len(MSGSTR):-1]
if len(data) == 0: # Multiline mode
step = IN_MSGSTR
else:
translations |= {(untranslated, data)}
step = WAITING_FOR_MSGID
elif step is IN_MSGSTR and line.startswith('"') and \
line.endswith('"'):
translated += line[1:-1]
elif step is IN_MSGSTR: # the MSGSTR is finished
step = WAITING_FOR_MSGID
if translated == '':
translated = untranslated
translations |= {(untranslated, translated)}
if step is IN_MSGSTR:
if translated == '':
translated = untranslated
translations |= {(untranslated, translated)}
return translations
def _normalize(string, remove_newline=False):
"""
String normalization.
All credit for this code goes
to ProgVal/Valentin Lorentz and the Limnoria project.
https://github.com/ProgVal/Limnoria/blob/master/src/i18n.py
:param string:
:param remove_newline:
:return:
"""
def normalize_whitespace(s):
"""Normalizes the whitespace in a string; \s+ becomes one space."""
if not s:
return str(s) # not the same reference
starts_with_space = (s[0] in ' \n\t\r')
ends_with_space = (s[-1] in ' \n\t\r')
if remove_newline:
newline_re = re.compile('[\r\n]+')
s = ' '.join(filter(bool, newline_re.split(s)))
s = ' '.join(filter(bool, s.split('\t')))
s = ' '.join(filter(bool, s.split(' ')))
if starts_with_space:
s = ' ' + s
if ends_with_space:
s += ' '
return s
string = string.replace('\\n\\n', '\n\n')
string = string.replace('\\n', ' ')
string = string.replace('\\"', '"')
string = string.replace("\'", "'")
string = normalize_whitespace(string)
string = string.strip('\n')
string = string.strip('\t')
return string
def get_locale_path(cog_folder: Path, extension: str) -> Path:
"""
Gets the folder path containing localization files.
:param Path cog_folder:
The cog folder that we want localizations for.
:param str extension:
Extension of localization files.
:return:
Path of possible localization file, it may not exist.
"""
return cog_folder / 'locales' / "{}.{}".format(get_locale(), extension)
class CogI18n:
def __init__(self, name, file_location):
"""
Initializes the internationalization object for a given cog.
:param name: Your cog name.
:param file_location:
This should always be ``__file__`` otherwise your localizations
will not load.
"""
self.cog_folder = Path(file_location).resolve().parent
self.cog_name = name
self.translations = {}
_i18n_cogs.update({self.cog_name: self})
self.load_translations()
def __call__(self, untranslated: str):
normalized_untranslated = _normalize(untranslated, True)
try:
return self.translations[normalized_untranslated]
except KeyError:
return untranslated
def load_translations(self):
"""
Loads the current translations for this cog.
"""
self.translations = {}
translation_file = None
locale_path = get_locale_path(self.cog_folder, 'po')
try:
try:
translation_file = locale_path.open('ru')
except ValueError: # We are using Windows
translation_file = locale_path.open('r')
self._parse(translation_file)
except (IOError, FileNotFoundError): # The translation is unavailable
pass
finally:
if translation_file is not None:
translation_file.close()
def _parse(self, translation_file):
self.translations = {}
for translation in _parse(translation_file):
self._add_translation(*translation)
def _add_translation(self, untranslated, translated):
untranslated = _normalize(untranslated, True)
translated = _normalize(translated)
if translated:
self.translations.update({untranslated: translated})

55
redbot/core/json_io.py Normal file
View File

@@ -0,0 +1,55 @@
import functools
import json
import os
import asyncio
import logging
from uuid import uuid4
# This is basically our old DataIO, except that it's now threadsafe
# and just a base for much more elaborate classes
from pathlib import Path
log = logging.getLogger("red")
PRETTY = {"indent": 4, "sort_keys": True, "separators": (',', ' : ')}
MINIFIED = {"sort_keys": True, "separators": (',', ':')}
class JsonIO:
"""Basic functions for atomic saving / loading of json files"""
def __init__(self, path: Path=Path.cwd()):
"""
:param path: Full path to file.
"""
self._lock = asyncio.Lock()
self.path = path
# noinspection PyUnresolvedReferences
def _save_json(self, data, settings=PRETTY):
log.debug("Saving file {}".format(self.path))
filename = self.path.stem
tmp_file = "{}-{}.tmp".format(filename, uuid4().fields[0])
tmp_path = self.path.parent / tmp_file
with tmp_path.open(encoding="utf-8", mode="w") as f:
json.dump(data, f, **settings)
tmp_path.replace(self.path)
async def _threadsafe_save_json(self, data, settings=PRETTY):
loop = asyncio.get_event_loop()
func = functools.partial(self._save_json, data, settings)
with await self._lock:
await loop.run_in_executor(None, func)
# noinspection PyUnresolvedReferences
def _load_json(self):
log.debug("Reading file {}".format(self.path))
with self.path.open(encoding='utf-8', mode="r") as f:
data = json.load(f)
return data
async def _threadsafe_load_json(self, path):
loop = asyncio.get_event_loop()
func = functools.partial(self._load_json, path)
task = loop.run_in_executor(None, func)
with await self._lock:
return await asyncio.wait_for(task)

View File

245
redbot/core/locales/es.po Normal file
View File

@@ -0,0 +1,245 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR ORGANIZATION
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
msgid ""
msgstr ""
"Project-Id-Version: \n"
"POT-Creation-Date: 2017-08-26 18:11+EDT\n"
"PO-Revision-Date: 2017-08-27 09:02-0600\n"
"Language-Team: \n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: pygettext.py 1.5\n"
"X-Generator: Poedit 2.0.3\n"
"Last-Translator: UltimatePancake <pier.gaetani@gmail.com>\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"Language: es\n"
#: ../cog_manager.py:196
msgid ""
"Install Path: {}\n"
"\n"
msgstr ""
"Ubicación de instalación: {}\n"
"\n"
#: ../cog_manager.py:212
msgid "That path is does not exist or does not point to a valid directory."
msgstr "Esa ubicación no existe o no apunta a un directorio válido."
#: ../cog_manager.py:221
msgid "Path successfully added."
msgstr "Ubicación agregada exitósamente."
#: ../cog_manager.py:234
msgid "That is an invalid path number."
msgstr "Número de ubicación inválido."
#: ../cog_manager.py:238
msgid "Path successfully removed."
msgstr "Ubicación eliminada exitósamente."
#: ../cog_manager.py:254
msgid "Invalid 'from' index."
msgstr "Índice 'from' inválido."
#: ../cog_manager.py:260
msgid "Invalid 'to' index."
msgstr "Índice 'to' inválido."
#: ../cog_manager.py:264
msgid "Paths reordered."
msgstr "Ubicaciones reordenadas."
#: ../cog_manager.py:282
msgid "That path does not exist."
msgstr "Ubicación inexistente."
#: ../cog_manager.py:286
msgid "The bot will install new cogs to the `{}` directory."
msgstr "El bot instalará nuevos cogs en el directorio `{}`."
#: ../core_commands.py:35
msgid "No module by that name was found in any cog path."
msgstr "No existe módulo con ese nombre en ninguna ubicación."
#: ../core_commands.py:43
msgid "Failed to load package. Check your console or logs for details."
msgstr "Error cargando paquete. Revisar consola o bitácora para más detalles."
#: ../core_commands.py:47 ../core_commands.py:56 ../core_commands.py:76
#: ../core_commands.py:151 ../core_commands.py:212 ../core_commands.py:226
msgid "Done."
msgstr "Terminado."
#: ../core_commands.py:58
msgid "That extension is not loaded."
msgstr "Esa extensión no está cargada."
#: ../core_commands.py:71
msgid "Failed to reload package. Check your console or logs for details."
msgstr ""
"Error recargando paquete. Revisar consola o bitácora para más detalles."
#: ../core_commands.py:86
msgid "Shutting down... "
msgstr "Apagando..."
#: ../core_commands.py:123
msgid "The admin role for this guild has been set."
msgstr "El rol de administrador para este gremio ha sido configurado."
#: ../core_commands.py:131
msgid "The mod role for this guild has been set."
msgstr "El rol de moderador para este gremio ha sido configurado."
#: ../core_commands.py:145
msgid ""
"Failed. Remember that you can edit my avatar up to two times a hour. The URL "
"must be a direct link to a JPG / PNG."
msgstr ""
"Error. Recordar que sólo se puede editar el avatar hasta dos veces por hora. "
"La URL debe ser un enlace directo a un JPG o PNG."
#: ../core_commands.py:149
msgid "JPG / PNG format only."
msgstr "Únicamente formatos JPG o PNG."
#: ../core_commands.py:161
msgid "Game set."
msgstr "Juego configurado."
#: ../core_commands.py:190
msgid "Status changed to %s."
msgstr "Estado cambiado a %s."
#: ../core_commands.py:221
msgid ""
"Failed to change name. Remember that you can only do it up to 2 times an "
"hour. Use nicknames if you need frequent changes. `{}set nickname`"
msgstr ""
"Error cambiando nombre. Recordar que sólo se puede hacer hasta dos veces por "
"hora. Usar sobrenombres si cambios frecuentes son necesarios. `{}set "
"nickname`"
#: ../core_commands.py:236
msgid "I lack the permissions to change my own nickname."
msgstr "Carezco de permisos para cambiar mi sobrenombre."
#: ../core_commands.py:250 ../core_commands.py:263
msgid "Prefix set."
msgstr "Prefijo configurado."
#: ../core_commands.py:259
msgid "Guild prefixes have been reset."
msgstr "Prefijos de gremio han sido restaurados."
#: ../core_commands.py:282
msgid ""
"\n"
"Verification token:"
msgstr ""
"\n"
"Ficha de verificación:"
#: ../core_commands.py:285
msgid "Remember:\n"
msgstr "Recordar:\n"
#: ../core_commands.py:288
msgid ""
"I have printed a one-time token in the console. Copy and paste it here to "
"confirm you are the owner."
msgstr ""
"He impreso una ficha única en la consola. Copiar y pegarlo aca para "
"confirmar propiedad."
#: ../core_commands.py:296
msgid "The set owner request has timed out."
msgstr "Tiempo de espera para configuración de dueño ha terminado."
#: ../core_commands.py:302
msgid "You have been set as owner."
msgstr "Has sido configurado como dueño."
#: ../core_commands.py:304
msgid "Invalid token."
msgstr "Ficha inválida."
#: ../core_commands.py:313
msgid "Locale has been set."
msgstr "Localización configurada."
#: ../core_commands.py:323
msgid "User ID: %s"
msgstr "ID de usuario: %s"
#: ../core_commands.py:326
msgid "through DM"
msgstr "a través de mensaje privado"
#: ../core_commands.py:328
msgid "from {}"
msgstr "de {}"
#: ../core_commands.py:329
msgid " | Server ID: %s"
msgstr " | ID de servidor: %s"
#: ../core_commands.py:337
msgid "Use `{}dm {} <text>` to reply to this user"
msgstr "Utilizar `{}dm {} <texto>` para responder a este usuario"
#: ../core_commands.py:345
msgid "Sent by {} {}"
msgstr "Enviado por {} {}"
#: ../core_commands.py:357
msgid "I cannot send your message, I'm unable to find my owner... *sigh*"
msgstr "Error enviando mensaje, no encuentro a mi dueño... *suspiro*"
#: ../core_commands.py:360
msgid "I'm unable to deliver your message. Sorry."
msgstr "No puedo enviar tu mensaje, perdón."
#: ../core_commands.py:362
msgid "Your message has been sent."
msgstr "Tu mensaje ha sido enviado."
#: ../core_commands.py:376
msgid ""
"Invalid ID or user not found. You can only send messages to people I share a "
"server with."
msgstr ""
"ID inválido o usuario no encontrad. Solo puedes enviar mensajes a personas "
"con quienes compartes un servidor."
#: ../core_commands.py:382
msgid "Owner of %s"
msgstr "Dueño de %s"
#: ../core_commands.py:385
msgid "You can reply to this message with %scontact"
msgstr "Puedes contestar este mensaje utilizando %scontact"
#: ../core_commands.py:395
msgid "Sorry, I couldn't deliver your message to %s"
msgstr "Perdón, no pude enviar tu mensaje para %s"
#: ../core_commands.py:398
msgid "Message delivered to %s"
msgstr "Mensaje enviado a %s"
#: ../dev_commands.py:165
msgid "Already running a REPL session in this channel. Exit it with `quit`."
msgstr "Una sesión de REPL ya esta activa en este canal. Salir con `quit`."
#: ../dev_commands.py:169
msgid "Enter code to execute or evaluate. `exit()` or `quit` to exit."
msgstr "Ingresar codigo a ejecutar o evaluar. `exit()` o `quit` para salir."
#: ../dev_commands.py:234
msgid "Unexpected error: `{}`"
msgstr "Error inesperado: `{}`"

View File

@@ -0,0 +1,223 @@
# SOME DESCRIPTIVE TITLE.
# Copyright (C) YEAR ORGANIZATION
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"POT-Creation-Date: 2017-08-26 18:11+EDT\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=CHARSET\n"
"Content-Transfer-Encoding: ENCODING\n"
"Generated-By: pygettext.py 1.5\n"
#: ../cog_manager.py:196
msgid ""
"Install Path: {}\n"
"\n"
msgstr ""
#: ../cog_manager.py:212
msgid "That path is does not exist or does not point to a valid directory."
msgstr ""
#: ../cog_manager.py:221
msgid "Path successfully added."
msgstr ""
#: ../cog_manager.py:234
msgid "That is an invalid path number."
msgstr ""
#: ../cog_manager.py:238
msgid "Path successfully removed."
msgstr ""
#: ../cog_manager.py:254
msgid "Invalid 'from' index."
msgstr ""
#: ../cog_manager.py:260
msgid "Invalid 'to' index."
msgstr ""
#: ../cog_manager.py:264
msgid "Paths reordered."
msgstr ""
#: ../cog_manager.py:282
msgid "That path does not exist."
msgstr ""
#: ../cog_manager.py:286
msgid "The bot will install new cogs to the `{}` directory."
msgstr ""
#: ../core_commands.py:35
msgid "No module by that name was found in any cog path."
msgstr ""
#: ../core_commands.py:43
msgid "Failed to load package. Check your console or logs for details."
msgstr ""
#: ../core_commands.py:47 ../core_commands.py:56 ../core_commands.py:76
#: ../core_commands.py:151 ../core_commands.py:212 ../core_commands.py:226
msgid "Done."
msgstr ""
#: ../core_commands.py:58
msgid "That extension is not loaded."
msgstr ""
#: ../core_commands.py:71
msgid "Failed to reload package. Check your console or logs for details."
msgstr ""
#: ../core_commands.py:86
msgid "Shutting down... "
msgstr ""
#: ../core_commands.py:123
msgid "The admin role for this guild has been set."
msgstr ""
#: ../core_commands.py:131
msgid "The mod role for this guild has been set."
msgstr ""
#: ../core_commands.py:145
msgid "Failed. Remember that you can edit my avatar up to two times a hour. The URL must be a direct link to a JPG / PNG."
msgstr ""
#: ../core_commands.py:149
msgid "JPG / PNG format only."
msgstr ""
#: ../core_commands.py:161
msgid "Game set."
msgstr ""
#: ../core_commands.py:190
msgid "Status changed to %s."
msgstr ""
#: ../core_commands.py:221
msgid "Failed to change name. Remember that you can only do it up to 2 times an hour. Use nicknames if you need frequent changes. `{}set nickname`"
msgstr ""
#: ../core_commands.py:236
msgid "I lack the permissions to change my own nickname."
msgstr ""
#: ../core_commands.py:250 ../core_commands.py:263
msgid "Prefix set."
msgstr ""
#: ../core_commands.py:259
msgid "Guild prefixes have been reset."
msgstr ""
#: ../core_commands.py:282
msgid ""
"\n"
"Verification token:"
msgstr ""
#: ../core_commands.py:285
msgid ""
"Remember:\n"
msgstr ""
#: ../core_commands.py:288
msgid "I have printed a one-time token in the console. Copy and paste it here to confirm you are the owner."
msgstr ""
#: ../core_commands.py:296
msgid "The set owner request has timed out."
msgstr ""
#: ../core_commands.py:302
msgid "You have been set as owner."
msgstr ""
#: ../core_commands.py:304
msgid "Invalid token."
msgstr ""
#: ../core_commands.py:313
msgid "Locale has been set."
msgstr ""
#: ../core_commands.py:323
msgid "User ID: %s"
msgstr ""
#: ../core_commands.py:326
msgid "through DM"
msgstr ""
#: ../core_commands.py:328
msgid "from {}"
msgstr ""
#: ../core_commands.py:329
msgid " | Server ID: %s"
msgstr ""
#: ../core_commands.py:337
msgid "Use `{}dm {} <text>` to reply to this user"
msgstr ""
#: ../core_commands.py:345
msgid "Sent by {} {}"
msgstr ""
#: ../core_commands.py:357
msgid "I cannot send your message, I'm unable to find my owner... *sigh*"
msgstr ""
#: ../core_commands.py:360
msgid "I'm unable to deliver your message. Sorry."
msgstr ""
#: ../core_commands.py:362
msgid "Your message has been sent."
msgstr ""
#: ../core_commands.py:376
msgid "Invalid ID or user not found. You can only send messages to people I share a server with."
msgstr ""
#: ../core_commands.py:382
msgid "Owner of %s"
msgstr ""
#: ../core_commands.py:385
msgid "You can reply to this message with %scontact"
msgstr ""
#: ../core_commands.py:395
msgid "Sorry, I couldn't deliver your message to %s"
msgstr ""
#: ../core_commands.py:398
msgid "Message delivered to %s"
msgstr ""
#: ../dev_commands.py:165
msgid "Already running a REPL session in this channel. Exit it with `quit`."
msgstr ""
#: ../dev_commands.py:169
msgid "Enter code to execute or evaluate. `exit()` or `quit` to exit."
msgstr ""
#: ../dev_commands.py:234
msgid "Unexpected error: `{}`"
msgstr ""

View File

@@ -0,0 +1,17 @@
import subprocess
TO_TRANSLATE = [
'../cog_manager.py',
'../core_commands.py',
'../dev_commands.py'
]
def regen_messages():
subprocess.run(
['pygettext', '-n'] + TO_TRANSLATE
)
if __name__ == "__main__":
regen_messages()

View File

@@ -0,0 +1,43 @@
from raven import Client, breadcrumbs
from raven.handlers.logging import SentryHandler
from redbot.core import __version__
__all__ = ("init_sentry_logging", "should_log")
include_paths = (
'core',
'cogs.alias',
'cogs.audio',
'cogs.downloader',
'cogs.economy',
'cogs.general',
'cogs.image',
'cogs.streams',
'cogs.trivia',
'cogs.utils',
'tests.core.test_sentry',
'main',
'launcher'
)
client = None
def init_sentry_logging(logger):
global client
client = Client(
dsn=("https://27f3915ba0144725a53ea5a99c9ae6f3:87913fb5d0894251821dcf06e5e9cfe6@"
"sentry.telemetry.red/19?verify_ssl=0"),
release=__version__
)
breadcrumbs.ignore_logger("websockets")
breadcrumbs.ignore_logger("websockets.protocol")
handler = SentryHandler(client)
logger.addHandler(handler)
def should_log(module_name: str) -> bool:
return any(module_name.startswith(path) for path in include_paths)

View File

View File

@@ -0,0 +1,79 @@
def error(text):
return "\N{NO ENTRY SIGN} {}".format(text)
def warning(text):
return "\N{WARNING SIGN} {}".format(text)
def info(text):
return "\N{INFORMATION SOURCE} {}".format(text)
def question(text):
return "\N{BLACK QUESTION MARK ORNAMENT} {}".format(text)
def bold(text):
return "**{}**".format(text)
def box(text, lang=""):
ret = "```{}\n{}\n```".format(lang, text)
return ret
def inline(text):
return "`{}`".format(text)
def italics(text):
return "*{}*".format(text)
def pagify(text, delims=["\n"], *, escape_mass_mentions=True, shorten_by=8,
page_length=2000):
"""DOES NOT RESPECT MARKDOWN BOXES OR INLINE CODE"""
in_text = text
page_length -= shorten_by
while len(in_text) > page_length:
this_page_len = page_length
if escape_mass_mentions:
this_page_len -= (in_text.count("@here", 0, page_length) +
in_text.count("@everyone", 0, page_length))
closest_delim = max([in_text.rfind(d, 1, this_page_len)
for d in delims])
closest_delim = closest_delim if closest_delim != -1 else this_page_length
if escape_mass_mentions:
to_send = escape(in_text[:closest_delim], mass_mentions=True)
else:
to_send = in_text[:closest_delim]
if len(to_send.strip()) > 0:
yield to_send
in_text = in_text[closest_delim:]
if len(in_text.strip()) > 0:
if escape_mass_mentions:
yield escape(in_text, mass_mentions=True)
else:
yield in_text
def strikethrough(text):
return "~~{}~~".format(text)
def underline(text):
return "__{}__".format(text)
def escape(text, *, mass_mentions=False, formatting=False):
if mass_mentions:
text = text.replace("@everyone", "@\u200beveryone")
text = text.replace("@here", "@\u200bhere")
if formatting:
text = (text.replace("`", "\\`")
.replace("*", "\\*")
.replace("_", "\\_")
.replace("~", "\\~"))
return text