First commit

This commit is contained in:
Twentysix 2017-04-27 00:49:27 +02:00
parent 6251c585e4
commit 2063decbe7
20 changed files with 994 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
*.json
*.pyc
__pycache__
*.exe
*.dll
*.log

4
README.md Normal file
View File

@ -0,0 +1,4 @@
# Red - Discord Bot v3
**This is alpha. Regular use is not recommended.
There will not be any effort not to break current installations.**

5
cogs/audio/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .audio import Audio
def setup(bot):
bot.add_cog(Audio(bot))

112
cogs/audio/audio.py Normal file
View File

@ -0,0 +1,112 @@
from discord.ext import commands
from discord import FFmpegPCMAudio, PCMVolumeTransformer
import os
import youtube_dl
import discord
# Just a little experimental audio cog not meant for final release
class Audio:
"""Audio commands"""
def __init__(self, bot):
self.bot = bot
@commands.command()
async def local(self, ctx, *, filename: str):
"""Play mp3"""
if ctx.author.voice is None:
await ctx.send("Join a voice channel first!")
return
if ctx.guild.voice_client:
if ctx.guild.voice_client.channel != ctx.author.voice.channel:
await ctx.guild.voice_client.disconnect()
path = os.path.join("cogs", "audio", "songs", filename + ".mp3")
if not os.path.isfile(path):
await ctx.send("Let's play a file that exists pls")
return
player = PCMVolumeTransformer(FFmpegPCMAudio(path), volume=1)
voice = await ctx.author.voice.channel.connect()
voice.play(player)
await ctx.send("{} is playing a song...".format(ctx.author))
@commands.command()
async def play(self, ctx, url: str):
"""Play youtube url"""
url = url.strip("<").strip(">")
if ctx.author.voice is None:
await ctx.send("Join a voice channel first!")
return
elif "youtube.com" not in url.lower():
await ctx.send("Youtube links pls")
return
if ctx.guild.voice_client:
if ctx.guild.voice_client.channel != ctx.author.voice.channel:
await ctx.guild.voice_client.disconnect()
yt = YoutubeSource(url)
player = PCMVolumeTransformer(yt, volume=1)
voice = await ctx.author.voice.channel.connect()
voice.play(player)
await ctx.send("{} is playing a song...".format(ctx.author))
@commands.command()
async def stop(self, ctx):
"""Stops the music and disconnects"""
if ctx.guild.voice_client:
ctx.guild.voice_client.source.cleanup()
await ctx.guild.voice_client.disconnect()
else:
await ctx.send("I'm not even connected to a voice channel!", delete_after=2)
await ctx.message.delete()
@commands.command()
async def pause(self, ctx):
"""Pauses the music"""
if ctx.guild.voice_client:
ctx.guild.voice_client.pause()
await ctx.send("👌", delete_after=2)
else:
await ctx.send("I'm not even connected to a voice channel!", delete_after=2)
await ctx.message.delete()
@commands.command()
async def resume(self, ctx):
"""Resumes the music"""
if ctx.guild.voice_client:
ctx.guild.voice_client.resume()
await ctx.send("👌", delete_after=2)
else:
await ctx.send("I'm not even connected to a voice channel!", delete_after=2)
await ctx.message.delete()
@commands.command(hidden=True)
async def volume(self, ctx, n: float):
"""Sets the volume"""
if ctx.guild.voice_client:
ctx.guild.voice_client.source.volume = n
await ctx.send("Volume set.", delete_after=2)
else:
await ctx.send("I'm not even connected to a voice channel!", delete_after=2)
await ctx.message.delete()
def __unload(self):
for vc in self.bot.voice_clients:
if vc.source:
vc.source.cleanup()
self.bot.loop.create_task(vc.disconnect())
class YoutubeSource(discord.FFmpegPCMAudio):
def __init__(self, url):
opts = {
'format': 'webm[abr>0]/bestaudio/best',
'prefer_ffmpeg': True,
'quiet': True
}
ytdl = youtube_dl.YoutubeDL(opts)
self.info = ytdl.extract_info(url, download=False)
super().__init__(self.info['url'])

4
core/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .owner import Owner
def setup(bot):
bot.add_cog(Owner(bot))

61
core/bot.py Normal file
View File

@ -0,0 +1,61 @@
from discord.ext import commands
from collections import Counter
from core.utils.helpers import JsonGuildDB
import os
class Red(commands.Bot):
def __init__(self, cli_flags, **kwargs):
self._shutdown_mode = None
self.db = JsonGuildDB("core/data/settings.json",
autosave=True,
create_dirs=True)
def prefix_manager(bot, message):
global_prefix = self.db.get_global("prefix", [])
if message.guild is None:
return global_prefix
server_prefix = self.db.get(message.guild, "prefix", [])
return server_prefix if server_prefix else global_prefix
# Priority: args passed > cli flags > db
if "command_prefix" not in kwargs:
if cli_flags.prefix:
kwargs["command_prefix"] = lambda bot, message: cli_flags.prefix
else:
kwargs["command_prefix"] = None
if kwargs["command_prefix"] is None:
kwargs["command_prefix"] = prefix_manager
self.counter = Counter()
self.uptime = None
super().__init__(**kwargs)
async def is_owner(self, user, allow_coowners=True):
if allow_coowners:
if user.id in self.settings.coowners:
return True
return await super().is_owner(user)
async def send_cmd_help(self, ctx):
if ctx.invoked_subcommand:
pages = await self.formatter.format_help_for(ctx, ctx.invoked_subcommand)
for page in pages:
await ctx.send(page)
else:
pages = await self.formatter.format_help_for(ctx, ctx.command)
for page in pages:
await ctx.send(page)
async def logout(self, *, restart=False):
"""Gracefully quits Red with exit code 0
If restart is True, the exit code will be 26 instead
Upon receiving that exit code, the launcher restarts Red"""
self._shutdown_mode = not restart
await super().logout()
def list_packages(self):
"""Lists packages present in the cogs the folder"""
return os.listdir("cogs")

51
core/db.py Normal file
View File

@ -0,0 +1,51 @@
import asyncio
import functools
import json
import os
from uuid import uuid4
DEFAULT_JSON_SETTINGS = {
"indent": 4,
"sort_keys": True,
"separators": (',', ' : ')
}
class JSONAutosave:
def __init__(self, interval=5, **settings):
self.interval = interval
self._queue = {}
self._lock = asyncio.Lock()
self._json_settings = settings.pop("json_settings",
DEFAULT_JSON_SETTINGS)
self.loop = asyncio.get_event_loop()
self.task = self.loop.create_task(self._process_queue())
def add_to_queue(self, path, data):
self._queue[path] = data
async def _process_queue(self):
try:
while True:
print("looping")
queue = self._queue.copy()
self._queue = {}
for path, data in queue.items():
with await self._lock:
func = functools.partial(self._save_json, path, data)
try:
await self.loop.run_in_executor(None, func)
except Exception as e:
print(e) # Proper logging here
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
pass
def _save_json(self, path, data):
print("Saving " + path)
path, _ = os.path.splitext(path)
tmp_file = "{}-{}.tmp".format(path, uuid4().fields[0])
with open(tmp_file, encoding="utf-8", mode="w") as f:
json.dump(data, f, **self._json_settings)
os.replace(tmp_file, path)

85
core/events.py Normal file
View File

@ -0,0 +1,85 @@
import discord
import traceback
import datetime
import logging
from discord.ext import commands
from core.utils.chat_formatting import inline
log = logging.getLogger("red")
def init_events(bot):
@bot.event
async def on_connect():
if bot.uptime is None:
print("Connected to Discord. Getting ready...")
@bot.event
async def on_ready():
if bot.uptime is None:
bot.uptime = datetime.datetime.utcnow()
print("Loading cogs...")
# load the packages at this point
total_channels = len([c for c in bot.get_all_channels()])
total_users = len(set([m for m in bot.get_all_members()]))
print("Ready and operational on {} servers.\n"
"".format(len(bot.guilds)))
@bot.event
async def on_command_error(error, ctx):
if isinstance(error, commands.MissingRequiredArgument):
await bot.send_cmd_help(ctx)
elif isinstance(error, commands.BadArgument):
await bot.send_cmd_help(ctx)
elif isinstance(error, commands.DisabledCommand):
await ctx.send("That command is disabled.")
elif isinstance(error, commands.CommandInvokeError):
# Need to test if the following still works
"""
no_dms = "Cannot send messages to this user"
is_help_cmd = ctx.command.qualified_name == "help"
is_forbidden = isinstance(error.original, discord.Forbidden)
if is_help_cmd and is_forbidden and error.original.text == no_dms:
msg = ("I couldn't send the help message to you in DM. Either"
" you blocked me or you disabled DMs in this server.")
await ctx.send(msg)
return
"""
log.exception("Exception in command '{}'"
"".format(ctx.command.qualified_name),
exc_info=error.original)
message = ("Error in command '{}'. Check your console or "
"logs for details."
"".format(ctx.command.qualified_name))
exception_log = ("Exception in command '{}'\n"
"".format(ctx.command.qualified_name))
exception_log += "".join(traceback.format_exception(type(error),
error, error.__traceback__))
bot._last_exception = exception_log
await ctx.send(inline(message))
elif isinstance(error, commands.CommandNotFound):
pass
elif isinstance(error, commands.CheckFailure):
await ctx.send("")
elif isinstance(error, commands.NoPrivateMessage):
await ctx.send("That command is not available in DMs.")
elif isinstance(error, commands.CommandOnCooldown):
await ctx.send("This command is on cooldown. "
"Try again in {:.2f}s"
"".format(error.retry_after))
else:
log.exception(type(error).__name__, exc_info=error)
@bot.event
async def on_message(message):
bot.counter["messages_read"] += 1
await bot.process_commands(message)
@bot.event
async def on_resumed():
bot.counter["sessions_resumed"] += 1
@bot.event
async def on_command(command):
bot.counter["processed_commands"] += 1

30
core/global_checks.py Normal file
View File

@ -0,0 +1,30 @@
def init_global_checks(bot):
@bot.check
async def global_perms(ctx):
if await bot.is_owner(ctx.author):
return True
if bot.db.get_global("whitelist", []):
return ctx.author.id in bot.db.get_global("whitelist", [])
return ctx.author.id not in bot.db.get_global("blacklist", [])
@bot.check
async def local_perms(ctx):
if await bot.is_owner(ctx.author):
return True
elif ctx.message.guild is None:
return True
guild_perms = bot.db.get_all(ctx.guild, {})
local_blacklist = guild_perms.get("blacklist", [])
local_whitelist = guild_perms.get("whitelist", [])
if local_whitelist:
return ctx.author.id in local_whitelist
return ctx.author.id not in local_blacklist
@bot.check
async def bots(ctx):
return not ctx.author.bot

View File

75
core/json_flusher.py Normal file
View File

@ -0,0 +1,75 @@
import asyncio
import logging
from core.json_io import JsonIO, PRETTY
# This is where individual cogs can queue low priority writes to files
#
# Only the last queued write to a file actually gets executed.
# This helps considerably in reducing the total writes (especially in poorly
# coded cogs that would otherwise hammer the system with them)
#
# The flusher is used by the DB helpers in autosave mode
#
# The JSONFlusher class is supposed to be instanced only once, at boot
log = logging.getLogger("red")
_flusher = None
class JSONFlusher(JsonIO):
def __init__(self, interval=5, **settings):
self.interval = interval
self._queue = {}
self._lock = asyncio.Lock()
self._json_settings = settings.pop("json_settings", PRETTY)
self._loop = asyncio.get_event_loop()
self.task = self._loop.create_task(self._process_queue())
def add_to_queue(self, path, data):
"""Schedules a json file for later write
Calling this function multiple times with the same path will
result in only the last one getting scheduled"""
self._queue[path] = data
def remove_from_queue(self, path):
"""Removes json file from the writing queue"""
try:
del self._queue[path]
except:
pass
async def _process_queue(self):
try:
while True:
queue = self._queue.copy()
self._queue = {}
for path, data in queue.items():
with await self._lock:
try:
await self._threadsafe_save_json(path,
data,
self._json_settings)
except Exception as e:
log.critical("Flusher failed to write: {}"
"".format(e))
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
if self._queue:
log.warning("Flusher interrupted with "
"non-empty queue")
else:
log.debug("Flusher shutting down.")
def init_flusher():
"""Instances the flusher and initializes its task"""
global _flusher
_flusher = JSONFlusher()
def get_flusher():
"""Returns the global flusher instance"""
if _flusher is None:
raise RuntimeError("The flusher has not been initialized.")
return _flusher

46
core/json_io.py Normal file
View File

@ -0,0 +1,46 @@
import functools
import json
import os
import asyncio
import logging
from uuid import uuid4
# This is basically our old DataIO, except that it's now threadsafe
# and just a base for much more elaborate classes
log = logging.getLogger("red")
PRETTY = {"indent": 4, "sort_keys": True, "separators": (',', ' : ')}
MINIFIED = {"sort_keys": True, "separators": (',', ':')}
class JsonIO:
"""Basic functions for atomic saving / loading of json files
This is inherited by the flusher and db helpers"""
def _save_json(self, path, data, settings=PRETTY):
log.debug("Saving file {}".format(path))
filename, _ = os.path.splitext(path)
tmp_file = "{}-{}.tmp".format(filename, uuid4().fields[0])
with open(tmp_file, encoding="utf-8", mode="w") as f:
json.dump(data, f, **settings)
os.replace(tmp_file, path)
async def _threadsafe_save_json(self, path, data, settings=PRETTY):
loop = asyncio.get_event_loop()
func = functools.partial(self._save_json, path, data, settings)
await loop.run_in_executor(None, func)
def _load_json(self, path):
log.debug("Reading file {}".format(path))
with open(path, encoding='utf-8', mode="r") as f:
data = json.load(f)
return data
async def _threadsafe_load_json(self, path):
loop = asyncio.get_event_loop()
func = functools.partial(self._load_json, path)
task = loop.run_in_executor(None, func)
return await asyncio.wait_for(task)

115
core/owner.py Normal file
View File

@ -0,0 +1,115 @@
from discord.ext import commands
from core.utils import checks
from core.utils.chat_formatting import box
import asyncio
import importlib
import os
import discord
class Owner:
"""All owner-only commands that relate to debug bot operations."""
def __init__(self, bot):
self.bot = bot
@commands.command()
@checks.is_owner()
async def load(self, ctx, *, cog_name: str):
"""Loads a package"""
if not cog_name.startswith("cogs."):
cog_name = "cogs." + cog_name
self.bot.load_extension(cog_name)
await ctx.send("Done.")
@commands.group()
@checks.is_owner()
async def unload(self, ctx, *, cog_name: str):
"""Unloads a package"""
if not cog_name.startswith("cogs."):
cog_name = "cogs." + cog_name
if cog_name in self.bot.extensions:
self.bot.unload_extension(cog_name)
await ctx.send("Done.")
else:
await ctx.send("That extension is not loaded.")
@commands.command(name="reload")
@checks.is_owner()
async def _reload(self, ctx, *, cog_name: str):
"""Reloads a package"""
if not cog_name.startswith("cogs."):
cog_name = "cogs." + cog_name
self.refresh_modules(cog_name)
self.bot.unload_extension(cog_name)
self.bot.load_extension(cog_name)
await ctx.send("Done.")
def refresh_modules(self, module):
"""Interally reloads modules so that changes are detected"""
module = module.replace(".", os.sep)
for root, dirs, files in os.walk(module):
for name in files:
if name.endswith(".py"):
path = os.path.join(root, name)
path, _ = os.path.splitext(path)
path = ".".join(path.split(os.sep))
print("Reloading " + path)
m = importlib.import_module(path)
importlib.reload(m)
@commands.command(hidden=True)
@checks.is_owner()
async def debug(self, ctx, *, code):
"""Evaluates code"""
author = ctx.message.author
channel = ctx.message.channel
code = code.strip('` ')
result = None
global_vars = globals().copy()
global_vars['bot'] = self.bot
global_vars['ctx'] = ctx
global_vars['message'] = ctx.message
global_vars['author'] = ctx.message.author
global_vars['channel'] = ctx.message.channel
global_vars['guild'] = ctx.message.guild
try:
result = eval(code, global_vars, locals())
except Exception as e:
await ctx.send('```py\n{}: {}```'.format(type(e).__name__, str(e)),)
return
if asyncio.iscoroutine(result):
result = await result
result = str(result)
if ctx.message.guild is not None:
token = ctx.bot.http.token
r = "[EXPUNGED]"
result = result.replace(token, r)
result = result.replace(token.lower(), r)
result = result.replace(token.upper(), r)
await ctx.send(box(result, lang="py"))
@commands.command(hidden=True)
@checks.is_owner()
async def mock(self, ctx, user: discord.Member, *, command):
"""Runs a command as if it was issued by a different user
The prefix must not be entered"""
# Since we have stateful objects now this might be pretty bad
# Sorry Danny
old_author = ctx.author
old_content = ctx.message.content
ctx.message.author = user
ctx.message.content = ctx.prefix + command
await self.bot.process_commands(ctx.message)
ctx.message.author = old_author
ctx.message.content = old_content

57
core/settings.py Normal file
View File

@ -0,0 +1,57 @@
import argparse
# Do we even need a Settings class this time? To be decided
class Settings:
def __init__(self):
args = {}
self.coowners = []
def can_login(self):
"""Used on start to determine if Red is setup enough to login"""
raise NotImplementedError
def parse_cli_flags():
parser = argparse.ArgumentParser(description="Red - Discord Bot")
parser.add_argument("--owner", help="ID of the owner. Only who hosts "
"Red should be owner, this has "
"security implications")
parser.add_argument("--prefix", "-p", action="append",
help="Global prefix. Can be multiple")
parser.add_argument("--no-prompt",
action="store_true",
help="Disables console inputs. Features requiring "
"console interaction could be disabled as a "
"result")
parser.add_argument("--no-cogs",
action="store_true",
help="Starts Red with no cogs loaded, only core")
parser.add_argument("--self-bot",
action='store_true',
help="Specifies if Red should log in as selfbot")
parser.add_argument("--not-bot",
action='store_true',
help="Specifies if the token used belongs to a bot "
"account.")
parser.add_argument("--dry-run",
action="store_true",
help="Makes Red quit with code 0 just before the "
"login. This is useful for testing the boot "
"process.")
parser.add_argument("--debug",
action="store_true",
help="Sets the loggers level as debug")
parser.add_argument("--dev",
action="store_true",
help="Enables developer mode")
args = parser.parse_args()
if args.prefix:
args.prefix = sorted(args.prefix, reverse=True)
else:
args.prefix = []
return args

0
core/utils/__init__.py Normal file
View File

View File

@ -0,0 +1,76 @@
def error(text):
return "\N{NO ENTRY SIGN} {}".format(text)
def warning(text):
return "\N{WARNING SIGN} {}".format(text)
def info(text):
return "\N{INFORMATION SOURCE} {}".format(text)
def question(text):
return "\N{BLACK QUESTION MARK ORNAMENT} {}".format(text)
def bold(text):
return "**{}**".format(text)
def box(text, lang=""):
ret = "```{}\n{}\n```".format(lang, text)
return ret
def inline(text):
return "`{}`".format(text)
def italics(text):
return "*{}*".format(text)
def pagify(text, delims=["\n"], *, escape=True, shorten_by=8,
page_length=2000):
"""DOES NOT RESPECT MARKDOWN BOXES OR INLINE CODE"""
in_text = text
if escape:
num_mentions = text.count("@here") + text.count("@everyone")
shorten_by += num_mentions
page_length -= shorten_by
while len(in_text) > page_length:
closest_delim = max([in_text.rfind(d, 0, page_length)
for d in delims])
closest_delim = closest_delim if closest_delim != -1 else page_length
if escape:
to_send = escape_mass_mentions(in_text[:closest_delim])
else:
to_send = in_text[:closest_delim]
yield to_send
in_text = in_text[closest_delim:]
if escape:
yield escape_mass_mentions(in_text)
else:
yield in_text
def strikethrough(text):
return "~~{}~~".format(text)
def underline(text):
return "__{}__".format(text)
def escape(text, *, mass_mentions=False, formatting=False):
if mass_mentions:
text = text.replace("@everyone", "@\u200beveryone")
text = text.replace("@here", "@\u200bhere")
if formatting:
text = (text.replace("`", "\\`")
.replace("*", "\\*")
.replace("_", "\\_")
.replace("~", "\\~"))
return text

7
core/utils/checks.py Normal file
View File

@ -0,0 +1,7 @@
from discord.ext import commands
def is_owner(**kwargs):
async def check(ctx):
return await ctx.bot.is_owner(ctx.author, **kwargs)
return commands.check(check)

200
core/utils/helpers.py Normal file
View File

@ -0,0 +1,200 @@
import os
import discord
from collections import defaultdict
from core.json_io import JsonIO
from core import json_flusher
GLOBAL_KEY = '__global__'
SENTINEL = object()
class JsonDB(JsonIO):
"""
A DB-like helper class to streamline the saving of json files
Parameters:
file_path: str
The path of the json file you want to create / access
create_dirs: bool=False
If True, it will create any missing directory leading to
the file you want to create
autosave: bool=False
If True, any change to the "database" will be queued to the
flusher and scheduled for a later write
default_value: Optional=None
Same behaviour as a defaultdict
"""
def __init__(self, file_path, **kwargs):
create_dirs = kwargs.pop("create_dirs", False)
default_value = kwargs.pop("default_value", SENTINEL)
self.autosave = kwargs.pop("autosave", False)
self.path = file_path
self._flusher = json_flusher.get_flusher()
file_exists = os.path.isfile(file_path)
if create_dirs and not file_exists:
path, _ = os.path.split(file_path)
if path:
try:
os.makedirs(path)
except FileExistsError:
pass
if file_exists:
# Might be worth looking into threadsafe ways for very large files
self._data = self._load_json(file_path)
else:
self._data = {}
self._save()
if default_value is not SENTINEL:
def _get_default():
return default_value
self._data = defaultdict(_get_default, self._data)
def set(self, key, value):
"""Sets a DB's entry"""
self._data[key] = value
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def get(self, key, default=None):
"""Returns a DB's entry"""
return self._data.get(key, default)
def remove(self, key):
"""Removes a DB's entry"""
del self._data[key]
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def pop(self, key, default=None):
"""Removes and returns a DB's entry"""
return self._data.pop(key, default)
def wipe(self):
"""Wipes DB"""
self._data = {}
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def all(self):
"""Returns all DB's data"""
return self._data
def _save(self):
"""Using this should be avoided. Let's stick to threadsafe saves"""
self._save_json(self.path, self._data)
async def save(self):
self._flusher.remove_from_queue(self.path)
await self._threadsafe_save_json(self.path, self._data)
def __contains__(self, key):
return key in self._data
def __getitem__(self, key):
return self._data[key]
def __len__(self):
return len(self._data)
def __repr__(self):
return "<{} {}>".format(self.__class__.__name__, self._data)
class JsonGuildDB(JsonDB):
"""
A DB-like helper class to streamline the saving of json files
This is a variant of JsonDB that allows for guild specific data
Global data is still allowed with dedicated methods
Same parameters as JsonDB
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def set(self, guild, key, value):
"""Sets a guild's entry"""
if not isinstance(guild, discord.Guild):
raise TypeError('Can only set guild data')
if str(guild.id) not in self._data:
self._data[str(guild.id)] = {}
self._data[str(guild.id)][key] = value
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def get(self, guild, key, default=None):
"""Returns a guild's entry"""
if not isinstance(guild, discord.Guild):
raise TypeError('Can only get guild data')
if str(guild.id) not in self._data:
return default
return self._data[str(guild.id)].get(key, default)
def remove(self, guild, key):
"""Removes a guild's entry"""
if not isinstance(guild, discord.Guild):
raise TypeError('Can only remove guild data')
if str(guild.id) not in self._data:
raise KeyError('Guild data is not present')
del self._data[str(guild.id)][key]
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def pop(self, guild, key, default=None):
"""Removes and returns a guild's entry"""
if not isinstance(guild, discord.Guild):
raise TypeError('Can only remove guild data')
return self._data.get(str(guild.id), {}).pop(key, default)
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def get_all(self, guild, default):
"""Returns all entries of a guild"""
if not isinstance(guild, discord.Guild):
raise TypeError('Can only get guild data')
return self._data.get(str(guild.id), default)
def remove_all(self, guild):
"""Removes all entries of a guild"""
if not isinstance(guild, discord.Guild):
raise TypeError('Can only remove guilds')
super().remove(str(guild.id))
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def set_global(self, key, value):
"""Sets a global value"""
if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {}
self._data[GLOBAL_KEY][key] = value
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def get_global(self, key, default=None):
"""Gets a global value"""
if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {}
return self._data[GLOBAL_KEY].get(key, default)
def remove_global(self, key):
"""Removes a global value"""
if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {}
del self._data[key]
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def pop_global(self, key, default=None):
"""Removes and returns a global value"""
if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {}
return self._data.pop(key, default)
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)

0
launcher.py Normal file
View File

60
main.py Normal file
View File

@ -0,0 +1,60 @@
from core.bot import Red
from core.global_checks import init_global_checks
from core.events import init_events
from core.json_flusher import init_flusher
from core.settings import parse_cli_flags
import logging.handlers
import logging
import os
import sys
#
# Red - Discord Bot v3
#
# Made by Twentysix, improved by many
#
def init_loggers(cli_flags):
dpy_logger = logging.getLogger("discord")
dpy_logger.setLevel(logging.WARNING)
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
dpy_logger.addHandler(console)
logger = logging.getLogger("red")
red_format = logging.Formatter(
'%(asctime)s %(levelname)s %(module)s %(funcName)s %(lineno)d: '
'%(message)s',
datefmt="[%d/%m/%Y %H:%M]")
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(red_format)
if cli_flags.debug:
os.environ['PYTHONASYNCIODEBUG'] = '1'
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
fhandler = logging.handlers.RotatingFileHandler(
filename='red.log', encoding='utf-8', mode='a',
maxBytes=10**7, backupCount=5)
fhandler.setFormatter(red_format)
logger.addHandler(fhandler)
logger.addHandler(stdout_handler)
if __name__ == '__main__':
cli_flags = parse_cli_flags()
init_loggers(cli_flags)
init_flusher()
description = "Red v3 - Alpha"
red = Red(cli_flags, description=description, pm_help=None)
init_global_checks(red)
init_events(red)
red.load_extension('core')
if cli_flags.dev:
pass # load dev cog here?
red.run(os.environ['RED_TOKEN'], bot=not cli_flags.not_bot)